update documentation, error callbacks now get a const char * instead of a std::string

This commit is contained in:
Emiel Bruijntjes 2014-04-15 14:22:30 +02:00
parent 3d4a1b865e
commit 82249ee368
4 changed files with 271 additions and 120 deletions

385
README.md
View File

@ -77,13 +77,24 @@ class MyConnectionHandler : public AMQP::ConnectionHandler
* @param connection The connection on which the error occured
* @param message A human readable error message
*/
virtual void onError(AMQP::Connection *connection, const std::string &message)
virtual void onError(AMQP::Connection *connection, const char *message)
{
// @todo
// add your own implementation, for example by reporting the error
// to the user of your program, log the error, and destruct the
// connection object because it is no longer in a usable state
}
/**
* Method that is called when the connection was closed. This is the
* counter part of a call to Connection::close() and it confirms that the
* connection was correctly closed.
*
* @param connection The connection that was closed and that is now unusable
*/
virtual void onClosed(AMQP::Connection *connection) {}
};
````
After you've implemented the ConnectionHandler class, you can start using
@ -132,13 +143,13 @@ every time that it wants to send out data. We've explained that it is up to you
implement that method. But what about data in the other direction? How does the
library receive data back from RabbitMQ?
As we've explained above, the AMQP-CPP library does not do any IO by itself
and it is therefore of course also not possible for the library to receive data from
a socket. It is again up to you to do this. If, for example, you notice in your
event loop that the socket that is connected with the RabbitMQ server becomes
readable, you should read out that data (for example by using the recv() system
call), and pass the received bytes to the AMQP-CPP library. This is done by
calling the parse() method in the Connection object.
The AMQP-CPP library does not do any IO by itself and it is therefore of course
also not possible for the library to receive data from a socket. It is again up
to you to do this. If, for example, you notice in your event loop that the socket
that is connected with the RabbitMQ server becomes readable, you should read out
that socket (for example by using the recv() system call), and pass the received
bytes to the AMQP-CPP library. This is done by calling the parse() method in the
Connection object.
The Connection::parse() method gets two parameters, a pointer to a buffer of
data received from RabbitMQ, and a parameter that holds the size of this buffer.
@ -149,13 +160,13 @@ The code snippet below comes from the Connection.h C++ header file.
* Parse data that was recevied from RabbitMQ
*
* Every time that data comes in from RabbitMQ, you should call this method to parse
* the incoming data, and let it handle by the AMQP-CPP library. This method returns the number
* of bytes that were processed.
* the incoming data, and let it handle by the AMQP-CPP library. This method returns
* the number of bytes that were processed.
*
* If not all bytes could be processed because it only contained a partial frame, you should
* call this same method later on when more data is available. The AMQP-CPP library does not do
* any buffering, so it is up to the caller to ensure that the old data is also passed in that
* later call.
* If not all bytes could be processed because it only contained a partial frame,
* you should call this same method later on when more data is available. The
* AMQP-CPP library does not do any buffering, so it is up to the caller to ensure
* that the old data is also passed in that later call.
*
* @param buffer buffer to decode
* @param size size of the buffer to decode
@ -167,12 +178,20 @@ size_t parse(char *buffer, size_t size)
}
````
You should do all the book keeping for the buffer yourselves. If you for example
call the Connection::parse() method with a buffer of 100 bytes, and the method
returns that only 60 bytes were processed, you should later call the method again,
with a buffer filled with the remaining 40 bytes. If the method returns 0, you should
make a new call to parse() when more data is available, with a buffer with more
data.
CHANNELS
========
In the example you saw that we created a channel object. A channel is a virtual
connection over a single TCP connection, and it is possible to create many channels
that all use the same TCP connection.
In the example we created a channel object. A channel is a virtual connection over
a single TCP connection, and it is possible to create many channels that all use
the same TCP connection.
AMQP instructions are always sent over a channel, so before you can send the first
command to the RabbitMQ server, you first need a channel object. The channel
@ -183,30 +202,100 @@ C++ header file for a list of all available methods. Every method in it is well
documented.
The constructor of the Channel object accepts one parameter: the connection object.
Unlike the connection it does not accept a handler. Instead of this (almost) every
function in the channel returns a Deferred object. This deferred object can be used
to install handlers to be called in case of success, failure or in either case.
Unlike the connection it does not accept a handler. Instead of a handler object,
(almost) every method of the Channel class returns an instance of the 'Deferred'
class. This object can be used to install handlers that will be called in case
of success or failure.
For example, if you call the channel.declareQueue() method, the AMQP-CPP library will
send a message to the RabbitMQ message broker to ask it to declare the
For example, if you call the channel.declareExchange() method, the AMQP-CPP library
will send a message to the RabbitMQ message broker to ask it to declare the
queue. However, because all operations in the library are asynchronous, the
declareQueue() method immediately returns 'true', although it is at that time
not yet known whether the queue was correctly declared. Only after a while,
after the instruction has reached the server, and the confirmation from the server
has been sent back to the client, your handler method will be called to inform
you that the operation was succesful.
declareExchange() method can not return 'true' or 'false' to inform you whether
the operation was succesful or not. Only after a while, after the instruction
has reached the RabbitMQ server, and the confirmation from the server has been
sent back to the client, the library can report the result of the declareExchange()
call.
To prevent any blocking calls, the channel.declareExchange() method returns a
'Deferred' result object, on which you can set callback functions that will be
called when the operation succeeds or fails.
````c++
// create a channel
Channel myChannel(&connection);
myChannel.declareQueue("my-queue")
.onSuccess([](AMQP::Channel *channel, const std::string& name, uint32_t messageCount, uint32_t consumerCount) {
// by now the queue is created
})
.onError([](AMQP::Channel *channel, const std::string message) {
// something went wrong creating the channel
// declare an exchange, and install callbacks for success and failure
myChannel.declareExchange("my-exchange")
.onSuccess([]() {
// by now the exchange is created
})
.onError([](const char *message) {
// something went wrong creating the exchange
});
````
As you can see in the above example, we call the declareExchange() method, and
treat its return value as an object, on which we immediately install a lambda
callback function to handle success, and to handle failure.
Installing the callback methods is optional. If you're not interested in the
result of an operation, you do not have to install a callback for it. Next
to the onSuccess() and onError() callbacks that can be installed, you can also
install a onFinalize() method that gets called directly after the onSuccess()
and onError() methods, and that can be used to set a callback that should
run in either case: when the operation succeeds or when it fails.
The signature for the onError() method is always the same: it gets one parameter
with a human readable error message. The onSuccess() function has a different
signature depending on the method that you call. Most onSuccess() functions
(like the one we showed for the declareExchange() method) do not get any
parameters at all. Some specific onSuccess callbacks receive extra parameters
with additional information.
CHANNEL CALLBACKS
=================
As explained, most channel methods return a 'Deferred' object on which you can
install callbacks using the Deferred::onError() and Deferred::onSuccess() methods.
The callbacks that you install on a Deferred object, only apply to one specific
operation. If you want to install a generic error callback for the entire channel,
you can so so by using the Channel::onError() method. Next to the Channel::onError()
method, you can also install a callback to be notified when the channel is ready
for sending the first instruction to RabbitMQ.
````c++
// create a channel
Channel myChannel(connection, &myHandler);
// install a generic channel-error handler that will be called for every
// error that occurs on the channel
myChannel.onError([](const char *message) {
// report error
std::cout << "channel error: " << message << std::endl;
});
// install a generic callback that will be called when the channel is ready
// for sending the first instruction
myChannel.onReady([]() {
// send the first instructions (like publishing messages)
});
````
In theory, you should always use the onReady() function before you send any
other instructions over the channel. In practive however, the AMQP library
caches all instructions that were sent too early, so that you can use the
channel object right after it was constructed.
CHANNEL ERRORS
==============
It is important to realize that any error that occurs on a channel, will
invalidate the entire channel,. including all subsequent instructions that
were already sent over it. This means that if you call multiple methods in a row,
@ -218,10 +307,11 @@ myChannel.declareQueue("my-queue");
myChannel.declareExchange("my-exchange");
````
If the first declareQueue() call fails in the example above, your Deferred::onError()
method will be called after a while to report this failure. And although the
second instruction to declare an exchange has already been sent to the server, it will be
ignored because the channel was already in an invalid state after the first failure.
If the first declareQueue() call fails in the example above, the second
myChannel.declareExchange() method will not be executed, even when this
second instruction was already sent to the server. The second instruction will be
ignored by the RabbitMQ server because the channel was already in an invalid
state after the first failure.
You can overcome this by using multiple channels:
@ -232,12 +322,12 @@ channel1.declareQueue("my-queue");
channel2.declareExchange("my-exchange");
````
Now, if an error occurs with declaring the queue, it will not have
consequences for the other call. But this comes at a small price:
setting up the extra channel requires and extra instruction to be sent to the
RabbitMQ server, so some extra bytes are sent over the network,
and some additional resources in both the client application and the
RabbitMQ server are used (although this is all very limited).
Now, if an error occurs with declaring the queue, it will not have consequences
for the other call. But this comes at a small price: setting up the extra channel
requires and extra instruction to be sent to the RabbitMQ server, so some extra
bytes are sent over the network, and some additional resources in both the client
application and the RabbitMQ server are used (although this is all very limited).
FLAGS AND TABLES
================
@ -265,23 +355,40 @@ tables are used by many methods.
* @param flags combination of flags
* @param arguments optional arguments
*/
Deferred<const std::string&, uint32_t, uint32_t>& declareQueue(const std::string &name, int flags, const Table &arguments);
Deferred<const std::string&, uint32_t, uint32_t>& declareQueue(const std::string &name, const Table &arguments);
Deferred<const std::string&, uint32_t, uint32_t>& declareQueue(const std::string &name, int flags = 0);
Deferred<const std::string&, uint32_t, uint32_t>& declareQueue(int flags, const Table &arguments);
Deferred<const std::string&, uint32_t, uint32_t>& declareQueue(const Table &arguments);
Deferred<const std::string&, uint32_t, uint32_t>& declareQueue(int flags = 0);
DeferredQueue &declareQueue(const std::string &name, int flags, const Table &arguments);
DeferredQueue &declareQueue(const std::string &name, const Table &arguments);
DeferredQueue &declareQueue(const std::string &name, int flags = 0);
DeferredQueue &declareQueue(int flags, const Table &arguments);
DeferredQueue &declareQueue(const Table &arguments);
DeferredQueue &declareQueue(int flags = 0);
````
As you can see, the method comes in many forms, and it is up to you to choose
the one that is most appropriate. We now take a look at the most complete
one, the method with three parameters.
Many methods in the Channel class accept an integer parameter named 'flags'.
This is a variable in which you can set a number of options, by summing up
all the options that are described in the documentation. If you for example
want to create a durable, auto-deleted queue, you can pass in the value
AMQP::durable + AMQP::autodelete.
All above methods returns a 'DeferredQueue' object. The DeferredQueue class
extends from the AMQP::Deferred class and allows you to install a more powerful
onSuccess() callback function. The 'onSuccess' method for the declareQueue()
function gets three arguments:
````c++
// create a custom callback
auto callback = [](const std::string &name, int msgcount, int consumercount) {
// @todo add your own implementation
}
// declare the queue, and install the callback that is called on success
channel.declareQueue("myQueue").onSuccess(callback);
````
Just like many others methods in the Channel class, the declareQueue() method
accept an integer parameter named 'flags'. This is a variable in which you can
set method-specific options, by summing up all the options that are described in
the documentation above the method. If you for example want to create a durable,
auto-deleted queue, you can pass in the value AMQP::durable + AMQP::autodelete.
The declareQueue() method also accepts a parameter named 'arguments', which is of type
Table. This Table object can be used as an associative array to send additional
@ -314,8 +421,8 @@ exchange to publish to, the routing key to use, and the actual message that
you're publishing - all these parameters are standard C++ strings.
More extended versions of the publish() method exist that accept additional
arguments, and that enable you to publish entire Envelope objects, which are
objects that contain the message plus a list of optional meta information like
arguments, and that enable you to publish entire Envelope objects. An envelope
is an object that contains the message plus a list of optional meta information like
the content-type, content-encoding, priority, expire time and more. None of these
meta fields are interpreted by this library, and also the RabbitMQ ignores most
of them, but the AMQP protocol defines them, and they are free for you to use.
@ -334,8 +441,11 @@ in almost any form:
*
* The following flags can be used
*
* - mandatory if set, an unroutable message will be reported to the channel handler with the onReturned method
* - immediate if set, a message that could not immediately be consumed is returned to the onReturned method
* - mandatory if set, an unroutable message will be reported to the
* channel handler with the onReturned method
*
* - immediate if set, a message that could not immediately be consumed
* is returned to the onReturned method
*
* If either of the two flags is set, and the message could not immediately
* be published, the message is returned by the server to the client. If you
@ -357,28 +467,39 @@ bool publish(const std::string &exchange, const std::string &routingKey, int fla
bool publish(const std::string &exchange, const std::string &routingKey, const char *message, size_t size);
````
Published messages are normally not confirmed by the server, therefore the publish method does not
return a deferred. That's by design in the AMQP protocol, to not unnecessarily slow down message
publishing. As long as no error is reported via the ChannelHandler::onError() method, you can safely
Published messages are normally not confirmed by the server, and the RabbitMQ
will not send a report back to inform us whether the message was succesfully
published or not. Therefore the publish method does also not return a Deferred
object.
As long as no error is reported via the Channel::onError() method, you can safely
assume that your messages were delivered.
This can of course be a problem when you are publishing many messages. If you get an error halfway through
there is no way to know for sure how many messages made it to the broker and how many should be republished.
If this is important, you can wrap the publish commands inside a transaction. In this case, if an error occurs,
the transaction is automatically rolled back by RabbitMQ and none of the messages are actually published.
This can of course be a problem when you are publishing many messages. If you get
an error halfway through there is no way to know for sure how many messages made
it to the broker and how many should be republished. If this is important, you can
wrap the publish commands inside a transaction. In this case, if an error occurs,
the transaction is automatically rolled back by RabbitMQ and none of the messages
are actually published.
````c++
// start a transaction
channel.startTransaction();
// publish a number of messages
channel.publish("my-exchange", "my-key", "my first message");
channel.publish("my-exchange", "my-key", "another message");
// commit the transactions, and set up callbacks that are called when
// the transaction was successful or not
channel.commitTransaction()
.onSuccess([](AMQP::Channel *channel) {
// all messages were successfully published
})
.onError([](AMQP::Channel *channel) {
// none of the messages were published
// now we have to do it all over again
});
.onSuccess([]() {
// all messages were successfully published
})
.onError([]() {
// none of the messages were published
// now we have to do it all over again
});
````
@ -393,9 +514,8 @@ Just like the publish() method that we just described, the consume() method also
comes in many forms. The first parameter is always the name of the queue you like
to consume from. The subsequent parameters are an optional consumer tag, flags and
a table with custom arguments. The first additional parameter, the consumer tag,
is nothing more than a string identifier that will be passed with every consumed message.
This can be useful if you call the consume() methods a number of times to consume
from multiple queues, and you would like to know from which consume call the received messages came.
is nothing more than a string identifier that you can use when you want to stop
consuming.
The full documentation from the C++ Channel.h headerfile looks like this:
@ -411,10 +531,17 @@ The full documentation from the C++ Channel.h headerfile looks like this:
*
* The following flags are supported:
*
* - nolocal if set, messages published on this channel are not also consumed
* - noack if set, consumed messages do not have to be acked, this happens automatically
* - exclusive request exclusive access, only this consumer can access the queue
* - nowait the server does not have to send a response back that consuming is active
* - nolocal if set, messages published on this channel are
* not also consumed
*
* - noack if set, consumed messages do not have to be acked,
* this happens automatically
*
* - exclusive request exclusive access, only this consumer can
* access the queue
*
* - nowait the server does not have to send a response back
* that consuming is active
*
* The method ChannelHandler::onConsumerStarted() will be called when the
* consumer has started (unless the nowait option was set, in which case
@ -426,58 +553,82 @@ The full documentation from the C++ Channel.h headerfile looks like this:
* @param arguments additional arguments
* @return bool
*/
DeferredConsumer& consume(const std::string &queue, const std::string &tag, int flags, const AMQP::Table &arguments);
DeferredConsumer& consume(const std::string &queue, const std::string &tag, int flags = 0);
DeferredConsumer& consume(const std::string &queue, const std::string &tag, const AMQP::Table &arguments);
DeferredConsumer& consume(const std::string &queue, int flags, const AMQP::Table &arguments);
DeferredConsumer& consume(const std::string &queue, int flags = 0);
DeferredConsumer& consume(const std::string &queue, const AMQP::Table &arguments);
DeferredConsumer &consume(const std::string &queue, const std::string &tag, int flags, const AMQP::Table &arguments);
DeferredConsumer &consume(const std::string &queue, const std::string &tag, int flags = 0);
DeferredConsumer &consume(const std::string &queue, const std::string &tag, const AMQP::Table &arguments);
DeferredConsumer &consume(const std::string &queue, int flags, const AMQP::Table &arguments);
DeferredConsumer &consume(const std::string &queue, int flags = 0);
DeferredConsumer &consume(const std::string &queue, const AMQP::Table &arguments);
````
As you can see, the consume method returns a DeferredConsumer. This object is a regular Deferred, with the
addition of the onReceived method. This method can be used to retrieve incoming messages after consumption
has begun.
As you can see, the consume method returns a DeferredConsumer. This object is a
regular Deferred, with the some additions. The onSuccess() method of a
DeferredConsumer is slightly different than the onSuccess() method of a regular
Deferred object: one extra parameter will be supplied to your callback function
with the consumer tag.
The onSuccess() callback will be called when the consume operation _has started_,
but not when messages are actually consumed. For this you will have to install
a different callback, using the onReceived() method.
````c++
channel.consume("my-queue").onReceived([](AMQP::Channel *channel, const AMQP::Message &message, uint64_t deliveryTag, const std::string &consumerTag, bool redelivered) {
// @todo
// add your own processing
// callback function that is called when the consume operation starts
auto startCb = [](const std::string &consumertag) {
std::cout << "consume operation started" << std::endl;
};
// callback function that is called when the consume operation failed
auto errorCb = [](const char *message) {
std::cout << "consume operation failed" << std::endl;
}
// callback operation when a message was received
auto messageCb = [&channel](const AMQP::Message &message, uint64_t deliveryTag, bool redelivered) {
std::cout << "message received" << std::endl;
// acknowledge the message
channel.ack(deliveryTag);
}
// start consuming from the queue, and install the callbacks
channel.consume("my-queue")
.onSuccess(startCb)
.onError(errorCb)
.onReceived(messageCb);
// after the message was processed, acknowledge it
channel->ack(deliveryTag);
});
````
The Message object holds all information of the delivered message: the actual content,
all meta information from the envelope (in fact, the Message class is derived from the Envelope class),
and even the name of the exchange and the routing key that were used when the message was originally
published. For a full list of all information in the Message class, you best have a look at the
The Message object holds all information of the delivered message: the actual
content, all meta information from the envelope (in fact, the Message class is
derived from the Envelope class), and even the name of the exchange and the
routing key that were used when the message was originally published. For a full
list of all information in the Message class, you best have a look at the
message.h, envelope.h and metadata.h header files.
Another important parameter to the onReceived() method is the deliveryTag parameter. This is a
unique identifier that you need to acknowledge an incoming message. RabbitMQ only removes the
message after it has been acknowledged, so that if your application crashes while it was busy
processing the message, the message does not get lost but remains in the queue. But this means that
after you've processed the message, you must inform RabbitMQ about it by calling the Channel:ack()
method. This method is very simple and takes in its simplest form only one parameter: the
Another important parameter to the onReceived() method is the deliveryTag parameter.
This is a unique identifier that you need to acknowledge an incoming message.
RabbitMQ only removes the message after it has been acknowledged, so that if your
application crashes while it was busy processing the message, the message does
not get lost but remains in the queue. But this means that after you've processed
the message, you must inform RabbitMQ about it by calling the Channel:ack() method.
This method is very simple and takes in its simplest form only one parameter: the
deliveryTag of the message.
The consumerTag that you see in the onReceived method() is the same string identifier that was
passed to the Channel::consume() method.
Consuming messages is a continuous process. RabbitMQ keeps sending messages, until
you stop the consumer, which can be done by calling the Channel::cancel() method.
If you close the channel, or the entire TCP connection, consuming also stops.
Consuming messages is a continuous process. RabbitMQ keeps sending messages, until you stop
the consumer, which can be done by calling the Channel::cancel() method. If you close the channel,
or the entire TCP connection, consuming also stops.
RabbitMQ throttles the number of messages that are delivered to you, to prevent that your application
is flooded with messages from the queue, and to spread out the messages over multiple consumers.
This is done with a setting called quality-of-service (QOS). The QOS setting is a numeric value which
holds the number of unacknowledged messages that you are allowed to have. RabbitMQ stops sending
additional messages when the number of unacknowledges messages has reached this limit, and only
sends additional messages when an earlier message gets acknowledged. To change the QOS, you can
simple call Channel::setQos().
RabbitMQ throttles the number of messages that are delivered to you, to prevent
that your application is flooded with messages from the queue, and to spread out
the messages over multiple consumers. This is done with a setting called
quality-of-service (QOS). The QOS setting is a numeric value which holds the number
of unacknowledged messages that you are allowed to have. RabbitMQ stops sending
additional messages when the number of unacknowledges messages has reached this
limit, and only sends additional messages when an earlier message gets acknowledged.
To change the QOS, you can simple call Channel::setQos().
WORK IN PROGRESS

View File

@ -48,7 +48,7 @@ public:
* @param connection The connection that entered the error state
* @param message Error message
*/
virtual void onError(Connection *connection, const std::string &message) {}
virtual void onError(Connection *connection, const char *message) {}
/**
* Method that is called when the login attempt succeeded. After this method

View File

@ -273,7 +273,7 @@ public:
* Report an error message
* @param message
*/
void reportError(const std::string &message)
void reportError(const char *message)
{
// set connection state to closed
_state = state_closed;

View File

@ -157,7 +157,7 @@ public:
// no need to check for a channel, the error is connection wide
// report the error on the connection
connection->reportError(text());
connection->reportError(text().c_str());
// done
return true;