updated docblocks and readme file

This commit is contained in:
Emiel Bruijntjes 2014-01-04 09:20:45 -08:00
parent 7a6527dddf
commit a6fc442bc5
3 changed files with 145 additions and 27 deletions

128
README.md
View File

@ -93,25 +93,26 @@ channel.declareQueue("my-queue");
channel.bindQueue("my-exchange", "my-queue"); channel.bindQueue("my-exchange", "my-queue");
```` ````
A number of remarks about the above example. First you may have noticed that we've A number of remarks about the example above. First you may have noticed that we've
created all objects on the stack. You are of course also free to create them created all objects on the stack. You are of course also free to create them
on the heap with the C++ operator 'new'. That works just as good. on the heap with the C++ operator 'new'. That works just as good.
But more importantly, you see that in the example above we have created the But more importantly, you can see in the example above that we have created the
channel object directly after we created the connection object, and we did also channel object directly after we made the connection object, and we also
start declaring exchanges and queues right away. It would have been better to started declaring exchanges and queues right away. It would have been better if
first wait for the connection to be ready (and the onConnected() method is called we had waited for the connection to be ready, and create the channel object
in your handler object), before you create channel objects and start calling inside the onConnected() method in the MyConnectionHandler class. But this is
methods. But if you insist, you do not have to wait, and you are free to call not strictly necessary. The methods that are called before the connection is
the additional methods right away. ready are cached by the AMQP library and will be executed the moment the
connection becomes ready for use.
As we've explained above, the AMQP library does not do any IO by itself and when it As we've explained above, the AMQP library does not do any IO by itself and when it
needs to send data to RabbitMQ, it will call the onData() method in your handler needs to send data to RabbitMQ, it will call the onData() method in the handler
object. But it is of course also not possible for the library to receive data from object. It is of course also not possible for the library to receive data from
the server. It is again up to you to to this. If, for example, you notice in your the server. It is again up to you to to this. If, for example, you notice in your
event loop that the socket that is connected with the RabbitMQ server becomes event loop that the socket that is connected with the RabbitMQ server becomes
readable, you should read out that data (for example by calling the recv() system readable, you should read out that data (for example by using the recv() system
call), and pass the received bytes to the AMQP library. This can be done by call), and pass the received bytes to the AMQP library. This is done by
calling the parse() method in the Connection object. calling the parse() method in the Connection object.
The Connection::parse() method gets two parameters, a pointer to a buffer of The Connection::parse() method gets two parameters, a pointer to a buffer of
@ -141,4 +142,107 @@ size_t parse(char *buffer, size_t size)
} }
```` ````
The channel object has many methods to declare queues and exchanges, to bind
and unbind them, to publish and consume messages - and more. You can best take a
look in the channel.h C++ header file for a list of all available methods. Every
method in it is well documented.
The constructor of the Channel object gets two parameters: the connection object,
and a pointer to a ChannelHandler object. In the first example that we gave we have
not yet used this ChannelHandler object. However, in normal circumstances when you
construct a Channel object, you should also pass a pointer to a ChannelHandler object.
Just like the ConnectionHandler class, the ChannelHandler class is a base class that
you should extend and override the virtual methods that you need. The AMQP library
will call these methods to inform you that an operation has succeeded or has failed.
For example, if you call the channel.declareQueue() method, the AMQP library will
internally send a message to the RabbitMQ message broker to ask it to declare the
queue. If the method returns true it only means that the message has succesfully
been sent, but not that the queue has really been declared. This is only known
after the server has sent back a message to the client to report whether the
queue was succesfully created or not. When this answer is received, the AMQP library
will call the method ChannelHandler::onQueueDeclared() method - which you can
implement in the ChannelHandler object.
All methods in the base ChannelHandler class have a default empty implementation,
so you do not have to implement all of them - only the ones that you are interested
in.
````c++
#include <amqp.h>
class MyChannelHandler : public AMQP::ChannelHandler
{
public:
/**
* Method that is called when an error occurs on the channel, and
* the channel ends up in an error state
* @param channel the channel on which the error occured
* @param message human readable error message
*/
virtual void onError(AMQP::Channel *channel, const std::string &message)
{
// @todo
// do something with the error message (like reporting it to the end-user)
// and destruct the channel object because it now no longer is usable
}
/**
* Method that is called when a queue has been declared
* @param channel the channel via which the queue was declared
* @param name name of the queue
* @param messageCount number of messages in queue
* @param consumerCount number of active consumers
*/
virtual void onQueueDeclared(AMQP::Channel *channel, const std::string &name, uint32_t messageCount, uint32_t consumerCount)
{
// @todo
// do something with the information that cam back, or start using the queue
}
};
````
Let's take a closer look at one of the methods in the Channel object to explain
two other concepts of this AMQP library: flags and tables. The method that we
will be looking at is the Channel::declareQueue() method:
````c++
/**
* Declare a queue
*
* If you do not supply a name, a name will be assigned by the server.
*
* The flags can be a combination of the following values:
*
* - durable queue survives a broker restart
* - autodelete queue is automatically removed when all connected consumers are gone
* - passive only check if the queue exist
* - exclusive the queue only exists for this connection, and is automatically removed when connection is gone
*
* @param name name of the queue
* @param flags combination of flags
* @param arguments optional arguments
*/
bool declareQueue(const std::string &name, int flags, const Table &arguments) { return _implementation.declareQueue(name, flags, arguments); }
````
Many methods in the Channel class support have a parameter named 'flags'. This
is a variable in which you can enable a number of options. If you for example
want to create a durable, auto-deleted queue, you should pass in the value
AMQP::durable + AMQP::autodelete.
The declareQueue() method also accepts a arguments parameter, which is of type
Table. The Table object can be used as an associative array to send additional
options to RabbitMQ, that are often custom RabbitMQ extensions to the AMQP
standard:
````c++
// custom options that are passed to the declareQueue call
Table customOptions;
customOptions["x-message-ttl"] = 3600 * 1000;
customOptions["x-expires"] = 7200 * 1000;
// declare the queue
channel.declareQueue("my-queue-name", AMQP::durable + AMQP::autodelete, customOptions);
````

View File

@ -103,9 +103,9 @@ public:
* *
* The following flags can be used for the exchange: * The following flags can be used for the exchange:
* *
* - amqp_durable exchange survives a broker restart * - durable exchange survives a broker restart
* - amqp_autodelete exchange is automatically removed when all connected queues are removed * - autodelete exchange is automatically removed when all connected queues are removed
* - amqp_passive only check if the exchange exist * - passive only check if the exchange exist
* *
* @param name name of the exchange * @param name name of the exchange
* @param type exchange type * @param type exchange type
@ -125,7 +125,7 @@ public:
* *
* The following flags can be used for the exchange: * The following flags can be used for the exchange:
* *
* - amqp_ifunused only delete if no queues are connected * - ifunused only delete if no queues are connected
* @param name name of the exchange to remove * @param name name of the exchange to remove
* @param flags optional flags * @param flags optional flags
@ -138,7 +138,7 @@ public:
* *
* The following flags can be used for the exchange * The following flags can be used for the exchange
* *
* - amqp_nowait do not wait on response * - nowait do not wait on response
* *
* @param source the source exchange * @param source the source exchange
* @param target the target exchange * @param target the target exchange
@ -156,7 +156,7 @@ public:
* *
* The following flags can be used for the exchange * The following flags can be used for the exchange
* *
* - amqp_nowait do not wait on response * - nowait do not wait on response
* *
* @param target the target exchange * @param target the target exchange
* @param source the source exchange * @param source the source exchange
@ -176,10 +176,10 @@ public:
* *
* The flags can be a combination of the following values: * The flags can be a combination of the following values:
* *
* - amqp_durable queue survives a broker restart * - durable queue survives a broker restart
* - amqp_autodelete queue is automatically removed when all connected consumers are gone * - autodelete queue is automatically removed when all connected consumers are gone
* - amqp_passive only check if the queue exist * - passive only check if the queue exist
* - amqp_exclusive the queue only exists for this connection, and is automatically removed when connection is gone * - exclusive the queue only exists for this connection, and is automatically removed when connection is gone
* *
* @param name name of the queue * @param name name of the queue
* @param flags combination of flags * @param flags combination of flags
@ -197,7 +197,7 @@ public:
* *
* The following flags can be used for the exchange * The following flags can be used for the exchange
* *
* - amqp_noWait do not wait on response * - nowait do not wait on response
* *
* @param queue the target queue * @param queue the target queue
* @param exchange the source exchange * @param exchange the source exchange
@ -226,7 +226,7 @@ public:
* *
* The following flags can be used for the exchange * The following flags can be used for the exchange
* *
* - amqp_noWait do not wait on response * - nowait do not wait on response
* *
* @param name name of the queue * @param name name of the queue
* @param flags additional flags * @param flags additional flags
@ -239,8 +239,8 @@ public:
* *
* The following flags can be used for the exchange: * The following flags can be used for the exchange:
* *
* - amqp_ifunused only delete if no consumers are connected * - ifunused only delete if no consumers are connected
* - amqp_ifempty only delete if the queue is empty * - ifempty only delete if the queue is empty
* *
* @param name name of the queue to remove * @param name name of the queue to remove
* @param flags optional flags * @param flags optional flags

View File

@ -23,7 +23,6 @@ class ChannelHandler
public: public:
/** /**
* Method that is called when the channel was succesfully created. * Method that is called when the channel was succesfully created.
* Only after the channel was created, you can use it for subsequent messages over it
* @param channel * @param channel
*/ */
virtual void onReady(Channel *channel) {} virtual void onReady(Channel *channel) {}
@ -37,66 +36,77 @@ public:
/** /**
* Method that is called when the channel was paused * Method that is called when the channel was paused
* This is the result of a call to Channel::pause()
* @param channel * @param channel
*/ */
virtual void onPaused(Channel *channel) {} virtual void onPaused(Channel *channel) {}
/** /**
* Method that is called when the channel was resumed * Method that is called when the channel was resumed
* This is the result of a call to Channel::resume()
* @param channel * @param channel
*/ */
virtual void onResumed(Channel *channel) {} virtual void onResumed(Channel *channel) {}
/** /**
* Method that is called when a channel is closed * Method that is called when a channel is closed
* This is the result of a call to Channel::close()
* @param channel * @param channel
*/ */
virtual void onClosed(Channel *channel) {} virtual void onClosed(Channel *channel) {}
/** /**
* Method that is called when a transaction was started * Method that is called when a transaction was started
* This is the result of a call to Channel::startTransaction()
* @param channel * @param channel
*/ */
virtual void onTransactionStarted(Channel *channel) {} virtual void onTransactionStarted(Channel *channel) {}
/** /**
* Method that is called when a transaction was committed * Method that is called when a transaction was committed
* This is the result of a call to Channel::commitTransaction()
* @param channel * @param channel
*/ */
virtual void onTransactionCommitted(Channel *channel) {} virtual void onTransactionCommitted(Channel *channel) {}
/** /**
* Method that is called when a transaction was rolled back * Method that is called when a transaction was rolled back
* This is the result of a call to Channel::rollbackTransaction()
* @param channel * @param channel
*/ */
virtual void onTransactionRolledBack(Channel *channel) {} virtual void onTransactionRolledBack(Channel *channel) {}
/** /**
* Method that is called when an exchange is bound * Method that is called when an exchange is bound
* This is the result of a call to Channel::bindExchange()
* @param channel * @param channel
*/ */
virtual void onExchangeBound(Channel *channel) {} virtual void onExchangeBound(Channel *channel) {}
/** /**
* Method that is called when an exchange is unbound * Method that is called when an exchange is unbound
* This is the result of a call to Channel::unbindExchange()
* @param channel * @param channel
*/ */
virtual void onExchangeUnbound(Channel *channel) {} virtual void onExchangeUnbound(Channel *channel) {}
/** /**
* Method that is called when an exchange is deleted * Method that is called when an exchange is deleted
* This is the result of a call to Channel::deleteExchange()
* @param channel * @param channel
*/ */
virtual void onExchangeDeleted(Channel *channel) {} virtual void onExchangeDeleted(Channel *channel) {}
/** /**
* Mehod that is called when an exchange is declared * Mehod that is called when an exchange is declared
* This is the result of a call to Channel::declareExchange()
* @param channel * @param channel
*/ */
virtual void onExchangeDeclared(Channel *channel) {} virtual void onExchangeDeclared(Channel *channel) {}
/** /**
* Method that is called when a queue is declared * Method that is called when a queue is declared
* This is the result of a call to Channel::declareQueue()
* @param channel * @param channel
* @param name name of the queue * @param name name of the queue
* @param messageCount number of messages in queue * @param messageCount number of messages in queue
@ -106,6 +116,7 @@ public:
/** /**
* Method that is called when a queue is bound * Method that is called when a queue is bound
* This is the result of a call to Channel::bindQueue()
* @param channel * @param channel
* @param * @param
*/ */
@ -113,6 +124,7 @@ public:
/** /**
* Method that is called when a queue is deleted * Method that is called when a queue is deleted
* This is the result of a call to Channel::deleteQueue()
* @param channel * @param channel
* @param messageCount number of messages deleted along with the queue * @param messageCount number of messages deleted along with the queue
*/ */
@ -120,12 +132,14 @@ public:
/** /**
* Method that is called when a queue is unbound * Method that is called when a queue is unbound
* This is the result of a call to Channel::unbindQueue()
* @param channel * @param channel
*/ */
virtual void onQueueUnbound(Channel *channel) {} virtual void onQueueUnbound(Channel *channel) {}
/** /**
* Method that is called when a queue is purged * Method that is called when a queue is purged
* This is the result of a call to Channel::purgeQueue()
* @param messageCount number of message purged * @param messageCount number of message purged
*/ */
virtual void onQueuePurged(Channel *channel, uint32_t messageCount) {} virtual void onQueuePurged(Channel *channel, uint32_t messageCount) {}