Updated README
This commit is contained in:
parent
d2c17869e0
commit
3348e2881c
334
README.md
334
README.md
|
|
@ -1,27 +1,27 @@
|
|||
AMQP-CPP
|
||||
========
|
||||
|
||||
AMQP-CPP is a C++ library for communicating with a RabbitMQ message broker. The
|
||||
library can be used to parse incoming data from a RabbitMQ server, and to
|
||||
AMQP-CPP is a C++ library for communicating with a RabbitMQ message broker. The
|
||||
library can be used to parse incoming data from a RabbitMQ server, and to
|
||||
generate frames that can be sent to a RabbitMQ server.
|
||||
|
||||
Unlike all other AMQP libraries, this AMQP-CPP library does not make a connection to
|
||||
RabbitMQ by itself, nor does it create sockets and/or performs IO operations. As
|
||||
a user of this library, you first need to set up a socket connection
|
||||
to RabbitMQ by yourself, and implement a certain interface that you pass to the
|
||||
RabbitMQ by itself, nor does it create sockets and/or performs IO operations. As
|
||||
a user of this library, you first need to set up a socket connection
|
||||
to RabbitMQ by yourself, and implement a certain interface that you pass to the
|
||||
AMQP-CPP library and that the library will use for IO operations.
|
||||
|
||||
This architecture makes the library extremely flexible: it does not rely on
|
||||
operating system specific IO calls, and it can be easily integrated into any
|
||||
event loop. It is fully asynchronous and does not do any blocking (system) calls,
|
||||
event loop. It is fully asynchronous and does not do any blocking (system) calls,
|
||||
so it can be used in high performance applications without the need for threads.
|
||||
|
||||
|
||||
ABOUT
|
||||
=====
|
||||
|
||||
This library is created and maintained by Copernica (www.copernica.com), and is
|
||||
used inside the MailerQ (www.mailerq.com) application, MailerQ is a tool for
|
||||
This library is created and maintained by Copernica (www.copernica.com), and is
|
||||
used inside the MailerQ (www.mailerq.com) application, MailerQ is a tool for
|
||||
sending large volumes of email, using AMQP message queues.
|
||||
|
||||
|
||||
|
|
@ -42,34 +42,34 @@ class MyConnectionHandler : public AMQP::ConnectionHandler
|
|||
{
|
||||
/**
|
||||
* Method that is called by the AMQP library every time it has data
|
||||
* available that should be sent to RabbitMQ.
|
||||
* @param connection pointer to the main connection object
|
||||
* available that should be sent to RabbitMQ.
|
||||
* @param connection pointer to the main connection object
|
||||
* @param data memory buffer with the data that should be sent to RabbitMQ
|
||||
* @param size size of the buffer
|
||||
*/
|
||||
virtual void onData(AMQP::Connection *connection, const char *data, size_t size)
|
||||
{
|
||||
// @todo
|
||||
// @todo
|
||||
// Add your own implementation, for example by doing a call to the
|
||||
// send() system call. But be aware that the send() call may not
|
||||
// send all data at once, so you also need to take care of buffering
|
||||
// the bytes that could not immediately be sent, and try to send
|
||||
// the bytes that could not immediately be sent, and try to send
|
||||
// them again when the socket becomes writable again
|
||||
}
|
||||
|
||||
|
||||
/**
|
||||
* Method that is called by the AMQP library when the login attempt
|
||||
* succeeded. After this method has been called, the connection is ready
|
||||
* Method that is called by the AMQP library when the login attempt
|
||||
* succeeded. After this method has been called, the connection is ready
|
||||
* to use.
|
||||
* @param connection The connection that can now be used
|
||||
*/
|
||||
virtual void onConnected(AMQP::Connection *connection)
|
||||
{
|
||||
// @todo
|
||||
// add your own implementation, for example by creating a channel
|
||||
// add your own implementation, for example by creating a channel
|
||||
// instance, and start publishing or consuming
|
||||
}
|
||||
|
||||
|
||||
/**
|
||||
* Method that is called by the AMQP library when a fatal error occurs
|
||||
* on the connection, for example because data received from RabbitMQ
|
||||
|
|
@ -81,7 +81,7 @@ class MyConnectionHandler : public AMQP::ConnectionHandler
|
|||
{
|
||||
// @todo
|
||||
// add your own implementation, for example by reporting the error
|
||||
// to the user of your program, log the error, and destruct the
|
||||
// to the user of your program, log the error, and destruct the
|
||||
// connection object because it is no longer in a usable state
|
||||
}
|
||||
};
|
||||
|
|
@ -108,18 +108,18 @@ channel.bindQueue("my-exchange", "my-queue");
|
|||
A number of remarks about the example above. First you may have noticed that we've
|
||||
created all objects on the stack. You are of course also free to create them
|
||||
on the heap with the C++ operator 'new'. That works just as well, and is in real
|
||||
life code probably more useful as you normally want to keep your handlers, connection
|
||||
life code probably more useful as you normally want to keep your handlers, connection
|
||||
and channel objects around for a much longer time.
|
||||
|
||||
But more importantly, you can see in the example above that we have created the
|
||||
channel object directly after we made the connection object, and we also
|
||||
But more importantly, you can see in the example above that we have created the
|
||||
channel object directly after we made the connection object, and we also
|
||||
started declaring exchanges and queues right away. However, under the hood, a handshake
|
||||
protocol is executed between the server and the client when the Connection
|
||||
object is first created. During this handshake procedure it is not permitted to send
|
||||
other instructions (like opening a channel or declaring a queue). It would therefore have been better
|
||||
if we had first waited for the connection to be ready (implement the MyConnectionHandler::onConnected() method),
|
||||
and create the channel object only then. But this is not strictly necessary.
|
||||
The methods that are called before the handshake is completed are cached by the
|
||||
other instructions (like opening a channel or declaring a queue). It would therefore have been better
|
||||
if we had first waited for the connection to be ready (implement the MyConnectionHandler::onConnected() method),
|
||||
and create the channel object only then. But this is not strictly necessary.
|
||||
The methods that are called before the handshake is completed are cached by the
|
||||
AMQP library and will be executed the moment the handshake is completed and the
|
||||
connection becomes ready for use.
|
||||
|
||||
|
|
@ -128,15 +128,15 @@ PARSING INCOMING DATA
|
|||
=====================
|
||||
|
||||
The ConnectionHandler class has a method onData() that is called by the library
|
||||
every time that it wants to send out data. We've explained that it is up to you to
|
||||
implement that method. But what about data in the other direction? How does the
|
||||
every time that it wants to send out data. We've explained that it is up to you to
|
||||
implement that method. But what about data in the other direction? How does the
|
||||
library receive data back from RabbitMQ?
|
||||
|
||||
As we've explained above, the AMQP-CPP library does not do any IO by itself
|
||||
and it is therefore of course also not possible for the library to receive data from
|
||||
a socket. It is again up to you to do this. If, for example, you notice in your
|
||||
event loop that the socket that is connected with the RabbitMQ server becomes
|
||||
readable, you should read out that data (for example by using the recv() system
|
||||
As we've explained above, the AMQP-CPP library does not do any IO by itself
|
||||
and it is therefore of course also not possible for the library to receive data from
|
||||
a socket. It is again up to you to do this. If, for example, you notice in your
|
||||
event loop that the socket that is connected with the RabbitMQ server becomes
|
||||
readable, you should read out that data (for example by using the recv() system
|
||||
call), and pass the received bytes to the AMQP-CPP library. This is done by
|
||||
calling the parse() method in the Connection object.
|
||||
|
||||
|
|
@ -147,7 +147,7 @@ The code snippet below comes from the Connection.h C++ header file.
|
|||
````c++
|
||||
/**
|
||||
* Parse data that was recevied from RabbitMQ
|
||||
*
|
||||
*
|
||||
* Every time that data comes in from RabbitMQ, you should call this method to parse
|
||||
* the incoming data, and let it handle by the AMQP-CPP library. This method returns the number
|
||||
* of bytes that were processed.
|
||||
|
|
@ -170,36 +170,42 @@ size_t parse(char *buffer, size_t size)
|
|||
CHANNELS
|
||||
========
|
||||
|
||||
In the example you saw that we created a channel object. A channel is a virtual
|
||||
connection over a single TCP connection, and it is possible to create many channels
|
||||
In the example you saw that we created a channel object. A channel is a virtual
|
||||
connection over a single TCP connection, and it is possible to create many channels
|
||||
that all use the same TCP connection.
|
||||
|
||||
AMQP instructions are always sent over a channel, so before you can send the first
|
||||
command to the RabbitMQ server, you first need a channel object. The channel
|
||||
AMQP instructions are always sent over a channel, so before you can send the first
|
||||
command to the RabbitMQ server, you first need a channel object. The channel
|
||||
object has many methods to send instructions to the RabbitMQ server. It for
|
||||
example has methods to declare queues and exchanges, to bind and unbind them,
|
||||
and to publish and consume messages. You can best take a look at the channel.h
|
||||
C++ header file for a list of all available methods. Every method in it is well
|
||||
example has methods to declare queues and exchanges, to bind and unbind them,
|
||||
and to publish and consume messages. You can best take a look at the channel.h
|
||||
C++ header file for a list of all available methods. Every method in it is well
|
||||
documented.
|
||||
|
||||
The constructor of the Channel object accepts two parameters: the connection object,
|
||||
and a pointer to a ChannelHandler object. In the example we did
|
||||
not use this ChannelHandler object. However, in normal circumstances, you should
|
||||
always pass a pointer to a ChannelHandler object every time you construct a channel.
|
||||
|
||||
Just like the ConnectionHandler class, the ChannelHandler class is a base class that
|
||||
you can extend to override the virtual methods you need. The AMQP library
|
||||
will call these methods to inform you that an operation on the channel has succeeded
|
||||
or has failed.
|
||||
The constructor of the Channel object accepts one parameter: the connection object.
|
||||
Unlike the connection it does not accept a handler. Instead of this (almost) every
|
||||
function in the channel returns a Deferred object. This deferred object can be used
|
||||
to install handlers to be called in case of success, failure or in either case.
|
||||
|
||||
For example, if you call the channel.declareQueue() method, the AMQP-CPP library will
|
||||
send a message to the RabbitMQ message broker to ask it to declare the
|
||||
queue. However, because all operations in the library are asynchronous, the
|
||||
declareQueue() method immediately returns 'true', although it is at that time
|
||||
not yet known whether the queue was correctly declared. Only after a while,
|
||||
after the instruction has reached the server, and the confirmation from the server
|
||||
has been sent back to the client, your ChannelHandler::onQueueDeclared()
|
||||
method will be called to inform you that the operation was succesful.
|
||||
after the instruction has reached the server, and the confirmation from the server
|
||||
has been sent back to the client, your handler method will be called to inform
|
||||
you that the operation was succesful.
|
||||
|
||||
````c++
|
||||
Channel myChannel(&connection);
|
||||
myChannel.declareQueue("my-queue")
|
||||
.onSuccess([](AMQP::Channel *channel, const std::string& name, uint32_t messageCount, uint32_t consumerCount) {
|
||||
// by now the queue is created
|
||||
})
|
||||
.onError([](AMQP::Channel *channel, const std::string message) {
|
||||
// something went wrong creating the channel
|
||||
});
|
||||
````
|
||||
|
||||
It is important to realize that any error that occurs on a channel, will
|
||||
invalidate the entire channel,. including all subsequent instructions that
|
||||
|
|
@ -212,9 +218,9 @@ myChannel.declareQueue("my-queue");
|
|||
myChannel.declareExchange("my-exchange");
|
||||
````
|
||||
|
||||
If the first declareQueue() call fails in the example above, your ChannelHandler::onError()
|
||||
method will be called after a while to report this failure. And although the
|
||||
second instruction to declare an exchange has already been sent to the server, it will be
|
||||
If the first declareQueue() call fails in the example above, your Deferred::onError()
|
||||
method will be called after a while to report this failure. And although the
|
||||
second instruction to declare an exchange has already been sent to the server, it will be
|
||||
ignored because the channel was already in an invalid state after the first failure.
|
||||
|
||||
You can overcome this by using multiple channels:
|
||||
|
|
@ -223,7 +229,7 @@ You can overcome this by using multiple channels:
|
|||
Channel channel1(connection, &myHandler);
|
||||
Channel channel2(connection, &myHandler);
|
||||
channel1.declareQueue("my-queue");
|
||||
channel2.declareQueue("my-exchange");
|
||||
channel2.declareExchange("my-exchange");
|
||||
````
|
||||
|
||||
Now, if an error occurs with declaring the queue, it will not have
|
||||
|
|
@ -233,51 +239,6 @@ RabbitMQ server, so some extra bytes are sent over the network,
|
|||
and some additional resources in both the client application and the
|
||||
RabbitMQ server are used (although this is all very limited).
|
||||
|
||||
Let's get back to the ChannelHandler class. It has many methods that you can
|
||||
implement - all of which are optional. All methods in it have a default empty implementation,
|
||||
so you can choose to only override the ones that you are interested in. When you're
|
||||
writing a consumer application for example, you probably are only interested in
|
||||
errors that occur, and in incoming messages:
|
||||
|
||||
````c++
|
||||
#include <amqpcpp.h>
|
||||
|
||||
class MyChannelHandler : public AMQP::ChannelHandler
|
||||
{
|
||||
public:
|
||||
/**
|
||||
* Method that is called when an error occurs on the channel, and
|
||||
* the channel ends up in an error state
|
||||
* @param channel the channel on which the error occured
|
||||
* @param message human readable error message
|
||||
*/
|
||||
virtual void onError(AMQP::Channel *channel, const std::string &message)
|
||||
{
|
||||
// @todo
|
||||
// do something with the error message (like reporting it to the end-user)
|
||||
// and destruct the channel object because it now no longer is usable
|
||||
}
|
||||
|
||||
/**
|
||||
* Method that is called when a message has been received on a channel
|
||||
* This message will be called for every message that is received after
|
||||
* you started consuming. Make sure you acknowledge the messages when its
|
||||
* safe to remove them from RabbitMQ (unless you set no-ack option when you
|
||||
* started the consumer)
|
||||
* @param channel the channel on which the consumer was started
|
||||
* @param message the consumed message
|
||||
* @param deliveryTag the delivery tag, you need this to acknowledge the message
|
||||
* @param consumerTag the consumer identifier that was used to retrieve this message
|
||||
* @param redelivered is this a redelivered message?
|
||||
*/
|
||||
virtual void onReceived(AMQP::Channel *channel, const AMQP::Message &message, uint64_t deliveryTag, const std::string &consumerTag, bool redelivered)
|
||||
{
|
||||
// @todo
|
||||
// do something with the incoming message
|
||||
}
|
||||
};
|
||||
````
|
||||
|
||||
FLAGS AND TABLES
|
||||
================
|
||||
|
||||
|
|
@ -290,30 +251,30 @@ tables are used by many methods.
|
|||
````c++
|
||||
/**
|
||||
* Declare a queue
|
||||
*
|
||||
*
|
||||
* If you do not supply a name, a name will be assigned by the server.
|
||||
*
|
||||
*
|
||||
* The flags can be a combination of the following values:
|
||||
*
|
||||
*
|
||||
* - durable queue survives a broker restart
|
||||
* - autodelete queue is automatically removed when all connected consumers are gone
|
||||
* - passive only check if the queue exist
|
||||
* - exclusive the queue only exists for this connection, and is automatically removed when connection is gone
|
||||
*
|
||||
*
|
||||
* @param name name of the queue
|
||||
* @param flags combination of flags
|
||||
* @param arguments optional arguments
|
||||
*/
|
||||
bool declareQueue(const std::string &name, int flags, const AMQP::Table &arguments);
|
||||
bool declareQueue(const std::string &name, const AMQP::Table &arguments);
|
||||
bool declareQueue(const std::string &name, int flags = 0);
|
||||
bool declareQueue(int flags, const AMQP::Table &arguments);
|
||||
bool declareQueue(const AMQP::Table &arguments);
|
||||
bool declareQueue(int flags = 0);
|
||||
Deferred<const std::string&, uint32_t, uint32_t>& declareQueue(const std::string &name, int flags, const Table &arguments);
|
||||
Deferred<const std::string&, uint32_t, uint32_t>& declareQueue(const std::string &name, const Table &arguments);
|
||||
Deferred<const std::string&, uint32_t, uint32_t>& declareQueue(const std::string &name, int flags = 0);
|
||||
Deferred<const std::string&, uint32_t, uint32_t>& declareQueue(int flags, const Table &arguments);
|
||||
Deferred<const std::string&, uint32_t, uint32_t>& declareQueue(const Table &arguments);
|
||||
Deferred<const std::string&, uint32_t, uint32_t>& declareQueue(int flags = 0);
|
||||
````
|
||||
|
||||
As you can see, the method comes in many forms, and it is up to you to choose
|
||||
the one that is most appropriate. We now take a look at the most complete
|
||||
the one that is most appropriate. We now take a look at the most complete
|
||||
one, the method with three parameters.
|
||||
|
||||
Many methods in the Channel class accept an integer parameter named 'flags'.
|
||||
|
|
@ -324,13 +285,13 @@ AMQP::durable + AMQP::autodelete.
|
|||
|
||||
The declareQueue() method also accepts a parameter named 'arguments', which is of type
|
||||
Table. This Table object can be used as an associative array to send additional
|
||||
options to RabbitMQ, that are often custom RabbitMQ extensions to the AMQP
|
||||
options to RabbitMQ, that are often custom RabbitMQ extensions to the AMQP
|
||||
standard. For a list of all supported arguments, take a look at the documentation
|
||||
on the RabbitMQ website. With every new RabbitMQ release more features, and
|
||||
supported arguments are added.
|
||||
|
||||
The Table class is a very powerful class that enables you to build
|
||||
complicated, deeply nested structures full of strings, arrays and even other
|
||||
The Table class is a very powerful class that enables you to build
|
||||
complicated, deeply nested structures full of strings, arrays and even other
|
||||
tables. In reality, you only need strings and integers.
|
||||
|
||||
````c++
|
||||
|
|
@ -354,13 +315,13 @@ you're publishing - all these parameters are standard C++ strings.
|
|||
|
||||
More extended versions of the publish() method exist that accept additional
|
||||
arguments, and that enable you to publish entire Envelope objects, which are
|
||||
objects that contain the message plus a list of optional meta information like
|
||||
the content-type, content-encoding, priority, expire time and more. None of these
|
||||
meta fields are interpreted by this library, and also the RabbitMQ ignores most
|
||||
of them, but the AMQP protocol defines them, and they are free for you to use.
|
||||
For an extensive list of the fields that are supported, take a look at the MetaData.h
|
||||
header file (MetaData is the base class for Envelope). You should also check the
|
||||
RabbitMQ documentation to find out if an envelope header is interpreted by the
|
||||
objects that contain the message plus a list of optional meta information like
|
||||
the content-type, content-encoding, priority, expire time and more. None of these
|
||||
meta fields are interpreted by this library, and also the RabbitMQ ignores most
|
||||
of them, but the AMQP protocol defines them, and they are free for you to use.
|
||||
For an extensive list of the fields that are supported, take a look at the MetaData.h
|
||||
header file (MetaData is the base class for Envelope). You should also check the
|
||||
RabbitMQ documentation to find out if an envelope header is interpreted by the
|
||||
RabbitMQ server (at the time of this writing, only the expire time is being used).
|
||||
|
||||
The following snippet is copied from the Channel.h header file and lists all
|
||||
|
|
@ -370,17 +331,17 @@ in almost any form:
|
|||
````c++
|
||||
/**
|
||||
* Publish a message to an exchange
|
||||
*
|
||||
*
|
||||
* The following flags can be used
|
||||
*
|
||||
*
|
||||
* - mandatory if set, an unroutable message will be reported to the channel handler with the onReturned method
|
||||
* - immediate if set, a message that could not immediately be consumed is returned to the onReturned method
|
||||
*
|
||||
*
|
||||
* If either of the two flags is set, and the message could not immediately
|
||||
* be published, the message is returned by the server to the client. If you
|
||||
* want to catch such returned messages, you need to implement the
|
||||
* want to catch such returned messages, you need to implement the
|
||||
* ChannelHandler::onReturned() method.
|
||||
*
|
||||
*
|
||||
* @param exchange the exchange to publish to
|
||||
* @param routingkey the routing key
|
||||
* @param flags optional flags (see above)
|
||||
|
|
@ -396,18 +357,29 @@ bool publish(const std::string &exchange, const std::string &routingKey, int fla
|
|||
bool publish(const std::string &exchange, const std::string &routingKey, const char *message, size_t size);
|
||||
````
|
||||
|
||||
Published messages are normally not confirmed by the server, hence there is no
|
||||
ChannelHandler::onPublished() method that you can implement to find out if
|
||||
a message was correctly received by the server. That's by design in the
|
||||
AMQP protocol, to not unnecessarily slow down message publishing. As long
|
||||
as no error is reported via the ChannelHandler::onError() method, you can safely
|
||||
Published messages are normally not confirmed by the server, therefore the publish method does not
|
||||
return a deferred. That's by design in the AMQP protocol, to not unnecessarily slow down message
|
||||
publishing. As long as no error is reported via the ChannelHandler::onError() method, you can safely
|
||||
assume that your messages were delivered.
|
||||
|
||||
If you use the flags parameter to set either the option 'mandatory' or
|
||||
'immediate', a message that could not be routed or directly delivered to a consumer
|
||||
is sent back to the client, and ends up in the ChannelHandler::onReturned()
|
||||
method. At the time of this writing however, the 'immediate' option does not
|
||||
seem to be supported by RabbitMQ.
|
||||
This can of course be a problem when you are publishing many messages. If you get an error halfway through
|
||||
there is no way to know for sure how many messages made it to the broker and how many should be republished.
|
||||
If this is important, you can wrap the publish commands inside a transaction. In this case, if an error occurs,
|
||||
the transaction is automatically rolled back by RabbitMQ and none of the messages are actually published.
|
||||
|
||||
````c++
|
||||
channel.startTransaction();
|
||||
channel.publish("my-exchange", "my-key", "my first message");
|
||||
channel.publish("my-exchange", "my-key", "another message");
|
||||
channel.commitTransaction()
|
||||
.onSuccess([](AMQP::Channel *channel) {
|
||||
// all messages were successfully published
|
||||
})
|
||||
.onError([](AMQP::Channel *channel) {
|
||||
// none of the messages were published
|
||||
// now we have to do it all over again
|
||||
});
|
||||
````
|
||||
|
||||
|
||||
CONSUMING MESSAGES
|
||||
|
|
@ -430,80 +402,63 @@ The full documentation from the C++ Channel.h headerfile looks like this:
|
|||
````c++
|
||||
/**
|
||||
* Tell the RabbitMQ server that we're ready to consume messages
|
||||
*
|
||||
*
|
||||
* After this method is called, RabbitMQ starts delivering messages to the client
|
||||
* application. The consume tag is a string identifier that will be passed to
|
||||
* each received message, so that you can associate incoming messages with a
|
||||
* each received message, so that you can associate incoming messages with a
|
||||
* consumer. If you do not specify a consumer tag, the server will assign one
|
||||
* for you.
|
||||
*
|
||||
*
|
||||
* The following flags are supported:
|
||||
*
|
||||
*
|
||||
* - nolocal if set, messages published on this channel are not also consumed
|
||||
* - noack if set, consumed messages do not have to be acked, this happens automatically
|
||||
* - exclusive request exclusive access, only this consumer can access the queue
|
||||
* - nowait the server does not have to send a response back that consuming is active
|
||||
*
|
||||
* The method ChannelHandler::onConsumerStarted() will be called when the
|
||||
*
|
||||
* The method ChannelHandler::onConsumerStarted() will be called when the
|
||||
* consumer has started (unless the nowait option was set, in which case
|
||||
* no confirmation method is called)
|
||||
*
|
||||
*
|
||||
* @param queue the queue from which you want to consume
|
||||
* @param tag a consumer tag that will be associated with this consume operation
|
||||
* @param flags additional flags
|
||||
* @param arguments additional arguments
|
||||
* @return bool
|
||||
*/
|
||||
bool consume(const std::string &queue, const std::string &tag, int flags, const AMQP::Table &arguments);
|
||||
bool consume(const std::string &queue, const std::string &tag, int flags = 0);
|
||||
bool consume(const std::string &queue, const std::string &tag, const AMQP::Table &arguments);
|
||||
bool consume(const std::string &queue, int flags, const AMQP::Table &arguments);
|
||||
bool consume(const std::string &queue, int flags = 0);
|
||||
bool consume(const std::string &queue, const AMQP::Table &arguments);
|
||||
DeferredConsumer& consume(const std::string &queue, const std::string &tag, int flags, const AMQP::Table &arguments);
|
||||
DeferredConsumer& consume(const std::string &queue, const std::string &tag, int flags = 0);
|
||||
DeferredConsumer& consume(const std::string &queue, const std::string &tag, const AMQP::Table &arguments);
|
||||
DeferredConsumer& consume(const std::string &queue, int flags, const AMQP::Table &arguments);
|
||||
DeferredConsumer& consume(const std::string &queue, int flags = 0);
|
||||
DeferredConsumer& consume(const std::string &queue, const AMQP::Table &arguments);
|
||||
````
|
||||
|
||||
In your ChannelHandler you can override the onConsumerStarted() method, that will be
|
||||
first called before any messages are sent to you. Most users choose not to override this
|
||||
method, because there is not much useful to do in it. After the consumer has started, however,
|
||||
messages are starting to be sent from RabbitMQ to your client application, and they are all
|
||||
passed to the ChannelHandler::onReceived() method. This method is thus very important to implement.
|
||||
As you can see, the consume method returns a DeferredConsumer. This object is a regular Deferred, with the
|
||||
addition of the onReceived method. This method can be used to retrieve incoming messages after consumption
|
||||
has begun.
|
||||
|
||||
|
||||
````c++
|
||||
class MyChannelHandler : public AMQP::ChannelHandler
|
||||
{
|
||||
/**
|
||||
* Method that is called when a message has been received on a channel
|
||||
* This message will be called for every message that is received after
|
||||
* you started consuming. Make sure you acknowledge the messages when its
|
||||
* safe to remove them from RabbitMQ (unless you set no-ack option when you
|
||||
* started the consumer)
|
||||
* @param channel the channel on which the consumer was started
|
||||
* @param message the consumed message
|
||||
* @param deliveryTag the delivery tag, you need this to acknowledge the message
|
||||
* @param consumerTag the consumer identifier that was used to retrieve this message
|
||||
* @param redelivered is this a redelivered message?
|
||||
*/
|
||||
virtual void onReceived(AMQP::Channel *channel, const AMQP::Message &message, uint64_t deliveryTag, const std::string &consumerTag, bool redelivered)
|
||||
{
|
||||
// @todo
|
||||
// add your own processing
|
||||
|
||||
|
||||
// after the message was processed, acknowledge it
|
||||
channel->ack(deliveryTag);
|
||||
}
|
||||
}
|
||||
channel.consume("my-queue").onReceived([](AMQP::Channel *channel, const AMQP::Message &message, uint64_t deliveryTag, const std::string &consumerTag, bool redelivered) {
|
||||
// @todo
|
||||
// add your own processing
|
||||
|
||||
|
||||
// after the message was processed, acknowledge it
|
||||
channel->ack(deliveryTag);
|
||||
});
|
||||
````
|
||||
|
||||
The Message object holds all information of the delivered message: the actual content,
|
||||
The Message object holds all information of the delivered message: the actual content,
|
||||
all meta information from the envelope (in fact, the Message class is derived from the Envelope class),
|
||||
and even the name of the exchange and the routing key that were used when the message was originally
|
||||
published. For a full list of all information in the Message class, you best have a look at the
|
||||
message.h, envelope.h and metadata.h header files.
|
||||
|
||||
Another important parameter to the onReceived() method is the deliveryTag parameter. This is a
|
||||
Another important parameter to the onReceived() method is the deliveryTag parameter. This is a
|
||||
unique identifier that you need to acknowledge an incoming message. RabbitMQ only removes the
|
||||
message after it has been acknowledged, so that if your application crashes while it was busy
|
||||
message after it has been acknowledged, so that if your application crashes while it was busy
|
||||
processing the message, the message does not get lost but remains in the queue. But this means that
|
||||
after you've processed the message, you must inform RabbitMQ about it by calling the Channel:ack()
|
||||
method. This method is very simple and takes in its simplest form only one parameter: the
|
||||
|
|
@ -518,10 +473,10 @@ or the entire TCP connection, consuming also stops.
|
|||
|
||||
RabbitMQ throttles the number of messages that are delivered to you, to prevent that your application
|
||||
is flooded with messages from the queue, and to spread out the messages over multiple consumers.
|
||||
This is done with a setting called quality-of-service (QOS). The QOS setting is a numeric value which
|
||||
holds the number of unacknowledged messages that you are allowed to have. RabbitMQ stops sending
|
||||
additional messages when the number of unacknowledges messages has reached this limit, and only
|
||||
sends additional messages when an earlier message gets acknowledged. To change the QOS, you can
|
||||
This is done with a setting called quality-of-service (QOS). The QOS setting is a numeric value which
|
||||
holds the number of unacknowledged messages that you are allowed to have. RabbitMQ stops sending
|
||||
additional messages when the number of unacknowledges messages has reached this limit, and only
|
||||
sends additional messages when an earlier message gets acknowledged. To change the QOS, you can
|
||||
simple call Channel::setQos().
|
||||
|
||||
|
||||
|
|
@ -534,11 +489,12 @@ need additional attention:
|
|||
- ability to set up secure connections (or is this fully done on the IO level)
|
||||
- login with other protocols than login/password
|
||||
- publish confirms
|
||||
- returned messages
|
||||
|
||||
We also need to add more safety checks so that strange or invalid data from
|
||||
RabbitMQ does not break the library (although in reality RabbitMQ only sends
|
||||
We also need to add more safety checks so that strange or invalid data from
|
||||
RabbitMQ does not break the library (although in reality RabbitMQ only sends
|
||||
valid data). Also, when we now receive an answer from RabbitMQ that does not
|
||||
match the request that we sent before, we do not report an error (this is also
|
||||
match the request that we sent before, we do not report an error (this is also
|
||||
an issue that only occurs in theory).
|
||||
|
||||
It would be nice to have sample implementations for the ConnectionHandler
|
||||
|
|
|
|||
Loading…
Reference in New Issue