2014-01-07 15:48:37 +08:00
|
|
|
AMQP-CPP
|
|
|
|
|
========
|
2014-01-04 20:28:19 +08:00
|
|
|
|
2018-01-30 18:07:27 +08:00
|
|
|
[](https://travis-ci.org/CopernicaMarketingSoftware/AMQP-CPP)
|
2018-07-17 16:34:56 +08:00
|
|
|
[](https://ci.appveyor.com/project/mvdwerve/amqp-cpp/branch/master)
|
2018-01-30 18:07:27 +08:00
|
|
|
|
2018-11-08 20:32:09 +08:00
|
|
|
**Are you upgrading from AMQP-CPP 3 to AMQP-CPP 4?** [Please read the upgrade instructions](#upgrading)
|
2018-11-08 20:30:27 +08:00
|
|
|
|
2014-04-10 19:50:24 +08:00
|
|
|
AMQP-CPP is a C++ library for communicating with a RabbitMQ message broker. The
|
|
|
|
|
library can be used to parse incoming data from a RabbitMQ server, and to
|
2014-01-04 20:28:19 +08:00
|
|
|
generate frames that can be sent to a RabbitMQ server.
|
|
|
|
|
|
2018-01-27 06:50:41 +08:00
|
|
|
This library has a layered architecture, and allows you - if you like - to
|
|
|
|
|
completely take care of the network layer. If you want to set up and manage the
|
2015-11-01 17:43:45 +08:00
|
|
|
network connections yourself, the AMQP-CPP library will not make a connection to
|
2018-01-27 06:50:41 +08:00
|
|
|
RabbitMQ by itself, nor will it create sockets and/or perform IO operations. As
|
|
|
|
|
a user of this library, you create the socket connection and implement a certain
|
|
|
|
|
interface that you pass to the AMQP-CPP library and that the library will use
|
2015-11-01 17:43:45 +08:00
|
|
|
for IO operations.
|
|
|
|
|
|
2018-01-27 06:50:41 +08:00
|
|
|
Intercepting this network layer is however optional, the AMQP-CPP library also
|
2018-11-08 16:58:36 +08:00
|
|
|
comes with a predefined TCP and TLS module that can be used if you trust the AMQP
|
|
|
|
|
library to take care of the network (and optional TLS) handling. In that case, the
|
|
|
|
|
AMQP-CPP library does all the system and library calls to set up network connections
|
|
|
|
|
and send and receive the (possibly encrypted) data.
|
2015-11-01 17:43:45 +08:00
|
|
|
|
2018-01-27 06:50:41 +08:00
|
|
|
This layered architecture makes the library extremely flexible and portable: it
|
|
|
|
|
does not necessarily rely on operating system specific IO calls, and can be
|
2015-11-02 16:20:36 +08:00
|
|
|
easily integrated into any kind of event loop. If you want to implement the AMQP
|
2018-01-27 06:50:41 +08:00
|
|
|
protocol on top of some [unusual other communication layer](https://tools.ietf.org/html/rfc1149),
|
|
|
|
|
this library can be used for that - but if you want to use it with regular TCP
|
|
|
|
|
connections, setting it up is just as easy.
|
2015-11-02 16:20:36 +08:00
|
|
|
|
2018-01-27 06:50:41 +08:00
|
|
|
AMQP-CPP is fully asynchronous and does not do any blocking (system) calls, so
|
2015-11-02 16:20:36 +08:00
|
|
|
it can be used in high performance applications without the need for threads.
|
2014-01-05 01:42:58 +08:00
|
|
|
|
2020-10-21 17:59:04 +08:00
|
|
|
The AMQP-CPP library uses C++11 features, so if you intend to use it, please make
|
2014-12-01 23:34:12 +08:00
|
|
|
sure that your compiler is up-to-date and supports C++11.
|
|
|
|
|
|
2018-01-27 06:50:41 +08:00
|
|
|
**Note for the reader:** This readme file has a peculiar structure. We start
|
2015-11-17 16:52:03 +08:00
|
|
|
explaining the pure and hard core low level interface in which you have to
|
|
|
|
|
take care of opening socket connections yourself. In reality, you probably want
|
2017-12-21 21:01:35 +08:00
|
|
|
to use the simpler TCP interface that is being described [later on](#tcp-connections).
|
2015-11-17 16:52:03 +08:00
|
|
|
|
2014-01-05 01:42:58 +08:00
|
|
|
|
|
|
|
|
ABOUT
|
|
|
|
|
=====
|
|
|
|
|
|
2014-04-10 19:50:24 +08:00
|
|
|
This library is created and maintained by Copernica (www.copernica.com), and is
|
2018-11-05 23:11:32 +08:00
|
|
|
used inside the MailerQ (www.mailerq.com) and Yothalot (www.yothalot.com) applications.
|
|
|
|
|
MailerQ is a tool for sending large volumes of email, using AMQP message queues, and Yothalot
|
|
|
|
|
is a big data processing map/reduce framework.
|
2014-01-04 20:28:19 +08:00
|
|
|
|
2018-01-27 06:50:41 +08:00
|
|
|
Do you appreciate our work and are you looking for high quality email solutions?
|
2015-07-06 15:28:08 +08:00
|
|
|
Then check out our other commercial and open source solutions:
|
2014-01-04 20:28:19 +08:00
|
|
|
|
2015-04-03 15:04:46 +08:00
|
|
|
* Copernica Marketing Suite (www.copernica.com)
|
2015-07-06 15:28:08 +08:00
|
|
|
* MailerQ on-premise MTA (www.mailerq.com)
|
2015-04-01 18:31:20 +08:00
|
|
|
* Responsive Email web service (www.responsiveemail.com)
|
2015-07-06 15:28:08 +08:00
|
|
|
* SMTPeter cloud based SMTP server (www.smtpeter.com)
|
|
|
|
|
* PHP-CPP bridge between PHP and C++ (www.php-cpp.com)
|
|
|
|
|
* PHP-JS bridge between PHP and Javascript (www.php-js.com)
|
2015-11-01 17:50:21 +08:00
|
|
|
* Yothalot big data processor (www.yothalot.com)
|
2015-04-01 18:31:20 +08:00
|
|
|
|
2015-11-02 16:20:36 +08:00
|
|
|
|
|
|
|
|
INSTALLING
|
|
|
|
|
==========
|
2018-01-27 06:51:06 +08:00
|
|
|
|
2018-03-10 07:59:14 +08:00
|
|
|
AMQP-CPP comes with an optional Linux-only TCP module that takes care of the
|
|
|
|
|
network part required for the AMQP-CPP core library. If you use this module, you
|
|
|
|
|
are required to link with `pthread` and `dl`.
|
|
|
|
|
|
|
|
|
|
There are two methods to compile AMQP-CPP: CMake and Make. CMake is platform portable
|
2018-11-08 16:58:36 +08:00
|
|
|
and works on all systems, while the Makefile only works on Linux. The two methods
|
|
|
|
|
create both a shared and a static version of the AMQP-CPP library. Building of a
|
|
|
|
|
shared library is currently not supported on Windows.
|
2018-01-29 23:35:41 +08:00
|
|
|
|
2018-11-08 16:58:36 +08:00
|
|
|
After building there are two relevant files to #include when you use the library.
|
2018-01-29 05:41:30 +08:00
|
|
|
|
2018-03-09 21:12:44 +08:00
|
|
|
File | Include when?
|
|
|
|
|
---------------------|--------------------------------------------------------
|
2018-11-08 16:58:36 +08:00
|
|
|
amqpcpp.h | Always needed for the core features
|
2018-03-09 21:12:44 +08:00
|
|
|
amqpcpp/linux_tcp.h | If using the Linux-only TCP module
|
2018-01-29 05:40:51 +08:00
|
|
|
|
2018-01-29 23:34:50 +08:00
|
|
|
On Windows you are required to define `NOMINMAX` when compiling code that includes public AMQP-CPP header files.
|
2018-01-29 23:34:22 +08:00
|
|
|
|
2018-03-10 07:59:14 +08:00
|
|
|
## Using cmake
|
|
|
|
|
|
|
|
|
|
The CMake file supports both building and installing. You can choose not to use
|
2018-03-17 06:31:55 +08:00
|
|
|
the install functionality, and instead manually use the build output at `build/bin/`. Keep
|
2018-03-10 07:59:14 +08:00
|
|
|
in mind that the TCP module is only supported for Linux. An example install method
|
|
|
|
|
would be:
|
|
|
|
|
|
|
|
|
|
```bash
|
2018-01-27 06:50:41 +08:00
|
|
|
mkdir build
|
|
|
|
|
cd build
|
2019-09-18 05:52:39 +08:00
|
|
|
cmake .. [-DAMQP-CPP_BUILD_SHARED] [-DAMQP-CPP_LINUX_TCP]
|
2020-02-22 03:38:49 +08:00
|
|
|
cmake --build . --target install
|
2018-01-27 06:50:41 +08:00
|
|
|
```
|
|
|
|
|
|
2018-03-09 21:12:44 +08:00
|
|
|
Option | Default | Meaning
|
|
|
|
|
-------------------------|---------|-----------------------------------------------------------------------
|
|
|
|
|
AMQP-CPP_BUILD_SHARED | OFF | Static lib(ON) or shared lib(OFF)? Shared is not supported on Windows.
|
|
|
|
|
AMQP-CPP_LINUX_TCP | OFF | Should the Linux-only TCP module be built?
|
2018-01-27 06:50:41 +08:00
|
|
|
|
2018-03-10 07:59:14 +08:00
|
|
|
## Using make
|
2018-03-09 21:12:44 +08:00
|
|
|
|
2018-03-10 07:59:14 +08:00
|
|
|
Compiling and installing AMQP-CPP with make is as easy as running `make` and
|
|
|
|
|
then `make install`. This will install the full version of AMQP-CPP, including
|
|
|
|
|
the system specific TCP module. If you do not need the additional TCP module
|
|
|
|
|
(because you take care of handling the network stuff yourself), you can also
|
|
|
|
|
compile a pure form of the library. Use `make pure` and `make install` for that.
|
2015-11-02 16:20:36 +08:00
|
|
|
|
2015-11-02 16:40:07 +08:00
|
|
|
When you compile an application that uses the AMQP-CPP library, do not
|
2015-11-02 16:20:36 +08:00
|
|
|
forget to link with the library. For gcc and clang the linker flag is -lamqpcpp.
|
2018-01-27 06:50:41 +08:00
|
|
|
If you use the fullblown version of AMQP-CPP (with the TCP module), you also
|
2018-03-07 23:17:03 +08:00
|
|
|
need to pass the -lpthread and -ldl linker flags, because the TCP module uses a
|
|
|
|
|
thread for running an asynchronous and non-blocking DNS hostname lookup, and it
|
2018-11-08 16:58:36 +08:00
|
|
|
must be linked with the "dl" library to allow dynamic lookups for functions from
|
|
|
|
|
the openssl library if a secure connection to RabbitMQ has to be set up.
|
2015-11-02 16:20:36 +08:00
|
|
|
|
|
|
|
|
|
2015-04-01 18:31:20 +08:00
|
|
|
HOW TO USE AMQP-CPP
|
|
|
|
|
===================
|
2014-01-04 20:28:19 +08:00
|
|
|
|
2015-11-01 17:43:45 +08:00
|
|
|
As we mentioned above, the library can be used in a network-agnostic fashion.
|
2018-01-27 06:50:41 +08:00
|
|
|
It then does not do any IO by itself, and you need to pass an object to the
|
2015-11-02 16:40:07 +08:00
|
|
|
library that the library can use for IO. So, before you start using the
|
2018-11-08 16:58:36 +08:00
|
|
|
library, you first need to create a class that extends from the
|
2018-01-27 06:50:41 +08:00
|
|
|
ConnectionHandler base class. This is a class with a number of methods that are
|
|
|
|
|
called by the library every time it wants to send out data, or when it needs to
|
2015-11-01 17:43:45 +08:00
|
|
|
inform you that an error occured.
|
2014-01-04 20:28:19 +08:00
|
|
|
|
|
|
|
|
````c++
|
2014-01-07 15:48:37 +08:00
|
|
|
#include <amqpcpp.h>
|
2014-01-04 20:28:19 +08:00
|
|
|
|
|
|
|
|
class MyConnectionHandler : public AMQP::ConnectionHandler
|
|
|
|
|
{
|
|
|
|
|
/**
|
|
|
|
|
* Method that is called by the AMQP library every time it has data
|
2014-04-10 19:50:24 +08:00
|
|
|
* available that should be sent to RabbitMQ.
|
|
|
|
|
* @param connection pointer to the main connection object
|
2014-01-04 20:28:19 +08:00
|
|
|
* @param data memory buffer with the data that should be sent to RabbitMQ
|
|
|
|
|
* @param size size of the buffer
|
|
|
|
|
*/
|
|
|
|
|
virtual void onData(AMQP::Connection *connection, const char *data, size_t size)
|
|
|
|
|
{
|
2014-04-10 19:50:24 +08:00
|
|
|
// @todo
|
2014-01-04 20:28:19 +08:00
|
|
|
// Add your own implementation, for example by doing a call to the
|
|
|
|
|
// send() system call. But be aware that the send() call may not
|
|
|
|
|
// send all data at once, so you also need to take care of buffering
|
2014-04-10 19:50:24 +08:00
|
|
|
// the bytes that could not immediately be sent, and try to send
|
2014-01-04 20:28:19 +08:00
|
|
|
// them again when the socket becomes writable again
|
|
|
|
|
}
|
2014-04-10 19:50:24 +08:00
|
|
|
|
2014-01-04 20:28:19 +08:00
|
|
|
/**
|
2014-04-10 19:50:24 +08:00
|
|
|
* Method that is called by the AMQP library when the login attempt
|
|
|
|
|
* succeeded. After this method has been called, the connection is ready
|
2014-01-04 20:28:19 +08:00
|
|
|
* to use.
|
|
|
|
|
* @param connection The connection that can now be used
|
|
|
|
|
*/
|
2018-11-08 16:58:36 +08:00
|
|
|
virtual void onReady(AMQP::Connection *connection)
|
2014-01-04 20:28:19 +08:00
|
|
|
{
|
|
|
|
|
// @todo
|
2014-04-10 19:50:24 +08:00
|
|
|
// add your own implementation, for example by creating a channel
|
2014-01-04 20:28:19 +08:00
|
|
|
// instance, and start publishing or consuming
|
|
|
|
|
}
|
2014-04-10 19:50:24 +08:00
|
|
|
|
2014-01-04 20:28:19 +08:00
|
|
|
/**
|
|
|
|
|
* Method that is called by the AMQP library when a fatal error occurs
|
|
|
|
|
* on the connection, for example because data received from RabbitMQ
|
|
|
|
|
* could not be recognized.
|
|
|
|
|
* @param connection The connection on which the error occured
|
|
|
|
|
* @param message A human readable error message
|
|
|
|
|
*/
|
2014-04-15 20:22:30 +08:00
|
|
|
virtual void onError(AMQP::Connection *connection, const char *message)
|
2014-01-04 20:28:19 +08:00
|
|
|
{
|
|
|
|
|
// @todo
|
|
|
|
|
// add your own implementation, for example by reporting the error
|
2014-04-10 19:50:24 +08:00
|
|
|
// to the user of your program, log the error, and destruct the
|
2014-01-04 20:28:19 +08:00
|
|
|
// connection object because it is no longer in a usable state
|
|
|
|
|
}
|
2014-04-15 20:22:30 +08:00
|
|
|
|
|
|
|
|
/**
|
2014-04-29 22:07:43 +08:00
|
|
|
* Method that is called when the connection was closed. This is the
|
|
|
|
|
* counter part of a call to Connection::close() and it confirms that the
|
2018-11-08 16:58:36 +08:00
|
|
|
* AMQP connection was correctly closed.
|
2014-04-15 20:22:30 +08:00
|
|
|
*
|
|
|
|
|
* @param connection The connection that was closed and that is now unusable
|
|
|
|
|
*/
|
2018-11-08 16:58:36 +08:00
|
|
|
virtual void onClosed(AMQP::Connection *connection)
|
|
|
|
|
{
|
|
|
|
|
// @todo
|
|
|
|
|
// add your own implementation, for example by closing down the
|
|
|
|
|
// underlying TCP connection too
|
|
|
|
|
}
|
2014-04-15 20:22:30 +08:00
|
|
|
|
|
|
|
|
|
2014-01-04 20:28:19 +08:00
|
|
|
};
|
|
|
|
|
````
|
2018-11-08 16:58:36 +08:00
|
|
|
After you've implemented the ConnectionHandler class (which is entirely up to
|
|
|
|
|
you), you can start using the library by creating a Connection object, and one
|
|
|
|
|
or more Channel objects:
|
2014-01-04 21:11:06 +08:00
|
|
|
|
|
|
|
|
````c++
|
|
|
|
|
// create an instance of your own connection handler
|
|
|
|
|
MyConnectionHandler myHandler;
|
|
|
|
|
|
|
|
|
|
// create a AMQP connection object
|
2015-11-02 16:40:07 +08:00
|
|
|
AMQP::Connection connection(&myHandler, AMQP::Login("guest","guest"), "/");
|
2014-01-04 21:11:06 +08:00
|
|
|
|
|
|
|
|
// and create a channel
|
|
|
|
|
AMQP::Channel channel(&connection);
|
|
|
|
|
|
|
|
|
|
// use the channel object to call the AMQP method you like
|
|
|
|
|
channel.declareExchange("my-exchange", AMQP::fanout);
|
|
|
|
|
channel.declareQueue("my-queue");
|
2016-01-04 17:38:36 +08:00
|
|
|
channel.bindQueue("my-exchange", "my-queue", "my-routing-key");
|
2014-01-04 21:11:06 +08:00
|
|
|
````
|
|
|
|
|
|
2014-01-05 01:20:45 +08:00
|
|
|
A number of remarks about the example above. First you may have noticed that we've
|
2014-01-04 21:11:06 +08:00
|
|
|
created all objects on the stack. You are of course also free to create them
|
2014-01-07 18:30:01 +08:00
|
|
|
on the heap with the C++ operator 'new'. That works just as well, and is in real
|
2014-04-10 19:50:24 +08:00
|
|
|
life code probably more useful as you normally want to keep your handlers, connection
|
2015-11-02 16:40:07 +08:00
|
|
|
and channel objects around for a longer time.
|
2014-01-04 21:11:06 +08:00
|
|
|
|
2018-11-08 16:58:36 +08:00
|
|
|
But more importantly, you can see in the example above that we instantiated the
|
2014-04-10 19:50:24 +08:00
|
|
|
channel object directly after we made the connection object, and we also
|
2014-01-07 01:19:49 +08:00
|
|
|
started declaring exchanges and queues right away. However, under the hood, a handshake
|
|
|
|
|
protocol is executed between the server and the client when the Connection
|
|
|
|
|
object is first created. During this handshake procedure it is not permitted to send
|
2014-04-10 19:50:24 +08:00
|
|
|
other instructions (like opening a channel or declaring a queue). It would therefore have been better
|
2018-11-08 16:58:36 +08:00
|
|
|
if we had first waited for the connection to be ready (implement the MyConnectionHandler::onReady() method),
|
2014-04-10 19:50:24 +08:00
|
|
|
and create the channel object only then. But this is not strictly necessary.
|
|
|
|
|
The methods that are called before the handshake is completed are cached by the
|
2014-01-07 01:19:49 +08:00
|
|
|
AMQP library and will be executed the moment the handshake is completed and the
|
2014-01-05 01:20:45 +08:00
|
|
|
connection becomes ready for use.
|
2014-01-04 21:11:06 +08:00
|
|
|
|
2014-01-07 01:19:49 +08:00
|
|
|
|
|
|
|
|
PARSING INCOMING DATA
|
|
|
|
|
=====================
|
|
|
|
|
|
|
|
|
|
The ConnectionHandler class has a method onData() that is called by the library
|
2014-04-10 19:50:24 +08:00
|
|
|
every time that it wants to send out data. We've explained that it is up to you to
|
2016-06-10 21:21:10 +08:00
|
|
|
implement that method. Inside your ConnectionHandler::onData() method, you can for
|
2015-11-02 16:40:07 +08:00
|
|
|
example call the "send()" or "write()" system call to send out the data to
|
2015-11-01 17:43:45 +08:00
|
|
|
the RabbitMQ server. But what about data in the other direction? How does the
|
2014-01-07 01:19:49 +08:00
|
|
|
library receive data back from RabbitMQ?
|
|
|
|
|
|
2018-01-27 06:50:41 +08:00
|
|
|
In this raw setup, the AMQP-CPP library does not do any IO by itself and it is
|
|
|
|
|
therefore also not possible for the library to receive data from a
|
|
|
|
|
socket. It is again up to you to do this. If, for example, you notice in your
|
|
|
|
|
event loop that the socket that is connected with the RabbitMQ server becomes
|
|
|
|
|
readable, you should read out that socket (for example by using the recv() system
|
|
|
|
|
call), and pass the received bytes to the AMQP-CPP library. This is done by
|
2015-11-01 17:43:45 +08:00
|
|
|
calling the parse() method in the Connection object.
|
2014-01-04 21:11:06 +08:00
|
|
|
|
|
|
|
|
The Connection::parse() method gets two parameters, a pointer to a buffer of
|
2018-01-27 06:50:41 +08:00
|
|
|
data that you just read from the socket, and a parameter that holds the size of
|
2015-11-01 17:43:45 +08:00
|
|
|
this buffer. The code snippet below comes from the Connection.h C++ header file.
|
2014-01-04 21:11:06 +08:00
|
|
|
|
|
|
|
|
````c++
|
|
|
|
|
/**
|
2018-10-11 04:25:49 +08:00
|
|
|
* Parse data that was received from RabbitMQ
|
2014-04-10 19:50:24 +08:00
|
|
|
*
|
2014-01-04 21:11:06 +08:00
|
|
|
* Every time that data comes in from RabbitMQ, you should call this method to parse
|
2014-04-29 22:07:43 +08:00
|
|
|
* the incoming data, and let it handle by the AMQP-CPP library. This method returns
|
2014-04-15 20:22:30 +08:00
|
|
|
* the number of bytes that were processed.
|
2014-01-04 21:11:06 +08:00
|
|
|
*
|
2014-04-29 22:07:43 +08:00
|
|
|
* If not all bytes could be processed because it only contained a partial frame,
|
|
|
|
|
* you should call this same method later on when more data is available. The
|
|
|
|
|
* AMQP-CPP library does not do any buffering, so it is up to the caller to ensure
|
2014-04-15 20:22:30 +08:00
|
|
|
* that the old data is also passed in that later call.
|
2014-01-04 21:11:06 +08:00
|
|
|
*
|
|
|
|
|
* @param buffer buffer to decode
|
|
|
|
|
* @param size size of the buffer to decode
|
|
|
|
|
* @return number of bytes that were processed
|
|
|
|
|
*/
|
|
|
|
|
size_t parse(char *buffer, size_t size)
|
|
|
|
|
{
|
|
|
|
|
return _implementation.parse(buffer, size);
|
|
|
|
|
}
|
|
|
|
|
````
|
|
|
|
|
|
2014-04-29 22:07:43 +08:00
|
|
|
You should do all the book keeping for the buffer yourselves. If you for example
|
|
|
|
|
call the Connection::parse() method with a buffer of 100 bytes, and the method
|
|
|
|
|
returns that only 60 bytes were processed, you should later call the method again,
|
|
|
|
|
with a buffer filled with the remaining 40 bytes. If the method returns 0, you should
|
2014-04-15 20:27:16 +08:00
|
|
|
make a new call to parse() when more data is available, with a buffer that contains
|
|
|
|
|
both the old data, and the new data.
|
2014-04-15 20:22:30 +08:00
|
|
|
|
2016-06-16 03:07:32 +08:00
|
|
|
To optimize your calls to the parse() method, you _could_ use the Connection::expected()
|
|
|
|
|
and Connection::maxFrame() methods. The expected() method returns the number of bytes
|
|
|
|
|
that the library prefers to receive next. It is pointless to call the parse() method
|
2018-01-27 06:50:41 +08:00
|
|
|
with a smaller buffer, and it is best to call the method with a buffer of exactly this
|
|
|
|
|
size. The maxFrame() returns the max frame size for AMQP messages. If you read your
|
|
|
|
|
messages into a reusable buffer, you could allocate this buffer up to this size, so that
|
2016-06-16 03:07:32 +08:00
|
|
|
you never will have to reallocate.
|
|
|
|
|
|
2014-04-15 20:22:30 +08:00
|
|
|
|
2015-11-01 17:43:45 +08:00
|
|
|
TCP CONNECTIONS
|
|
|
|
|
===============
|
|
|
|
|
|
|
|
|
|
Although the AMQP-CPP library gives you extreme flexibility by letting you setup
|
2018-01-27 06:50:41 +08:00
|
|
|
your own network connections, the reality is that most if not all AMQP connections
|
|
|
|
|
use the TCP protocol. To help you out, the library therefore also comes with a
|
2015-11-01 17:43:45 +08:00
|
|
|
TCP module that takes care of setting up the network connections, and sending
|
2018-01-27 06:50:41 +08:00
|
|
|
and receiving the data.
|
2015-11-01 17:43:45 +08:00
|
|
|
|
|
|
|
|
If you want to use this TCP module, you should not use the AMQP::Connection
|
|
|
|
|
and AMQP::Channel classes that you saw above, but the alternative AMQP::TcpConnection
|
|
|
|
|
and AMQP::TcpChannel classes instead. You also do not have to create your own class
|
2018-01-27 06:50:41 +08:00
|
|
|
that implements the "AMQP::ConnectionHandler" interface - but a class that inherits
|
2018-11-08 16:58:36 +08:00
|
|
|
from "AMQP::TcpHandler" instead. This AMQP::TcpHandler class contains a set of
|
|
|
|
|
methods that you can override to intercept all sort of events that occur during the
|
|
|
|
|
TCP and AMQP connection lifetime. Overriding these methods is mostly optional, because
|
|
|
|
|
almost all have a default implementation. But you do need to implement the
|
|
|
|
|
"monitor()" method, as that is needed by the AMQP-CPP library to interact with
|
2015-11-02 16:40:07 +08:00
|
|
|
the main event loop:
|
2015-11-01 17:43:45 +08:00
|
|
|
|
|
|
|
|
````c++
|
|
|
|
|
#include <amqpcpp.h>
|
2018-03-29 01:53:48 +08:00
|
|
|
#include <amqpcpp/linux_tcp.h>
|
2015-11-01 17:43:45 +08:00
|
|
|
|
|
|
|
|
class MyTcpHandler : public AMQP::TcpHandler
|
|
|
|
|
{
|
2018-11-08 16:58:36 +08:00
|
|
|
/**
|
|
|
|
|
* Method that is called by the AMQP library when a new connection
|
|
|
|
|
* is associated with the handler. This is the first call to your handler
|
|
|
|
|
* @param connection The connection that is attached to the handler
|
|
|
|
|
*/
|
2019-04-26 01:52:24 +08:00
|
|
|
virtual void onAttached(AMQP::TcpConnection *connection) override
|
2018-11-08 16:58:36 +08:00
|
|
|
{
|
|
|
|
|
// @todo
|
|
|
|
|
// add your own implementation, for example initialize things
|
|
|
|
|
// to handle the connection.
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
/**
|
|
|
|
|
* Method that is called by the AMQP library when the TCP connection
|
|
|
|
|
* has been established. After this method has been called, the library
|
|
|
|
|
* still has take care of setting up the optional TLS layer and of
|
|
|
|
|
* setting up the AMQP connection on top of the TCP layer., This method
|
|
|
|
|
* is always paired with a later call to onLost().
|
|
|
|
|
* @param connection The connection that can now be used
|
|
|
|
|
*/
|
|
|
|
|
virtual void onConnected(AMQP::TcpConnection *connection) override
|
|
|
|
|
{
|
|
|
|
|
// @todo
|
2018-11-08 17:05:52 +08:00
|
|
|
// add your own implementation (probably not needed)
|
2018-11-08 16:58:36 +08:00
|
|
|
}
|
|
|
|
|
|
|
|
|
|
/**
|
|
|
|
|
* Method that is called when the secure TLS connection has been established.
|
|
|
|
|
* This is only called for amqps:// connections. It allows you to inspect
|
|
|
|
|
* whether the connection is secure enough for your liking (you can
|
|
|
|
|
* for example check the server certicate). The AMQP protocol still has
|
|
|
|
|
* to be started.
|
|
|
|
|
* @param connection The connection that has been secured
|
|
|
|
|
* @param ssl SSL structure from openssl library
|
|
|
|
|
* @return bool True if connection can be used
|
|
|
|
|
*/
|
|
|
|
|
virtual bool onSecured(AMQP::TcpConnection *connection, const SSL *ssl) override
|
|
|
|
|
{
|
|
|
|
|
// @todo
|
|
|
|
|
// add your own implementation, for example by reading out the
|
|
|
|
|
// certificate and check if it is indeed yours
|
|
|
|
|
return true;
|
|
|
|
|
}
|
|
|
|
|
|
2015-11-01 17:43:45 +08:00
|
|
|
/**
|
|
|
|
|
* Method that is called by the AMQP library when the login attempt
|
2018-11-08 16:58:36 +08:00
|
|
|
* succeeded. After this the connection is ready to use.
|
2015-11-01 17:43:45 +08:00
|
|
|
* @param connection The connection that can now be used
|
|
|
|
|
*/
|
2018-11-08 16:58:36 +08:00
|
|
|
virtual void onReady(AMQP::TcpConnection *connection) override
|
2015-11-01 17:43:45 +08:00
|
|
|
{
|
|
|
|
|
// @todo
|
|
|
|
|
// add your own implementation, for example by creating a channel
|
|
|
|
|
// instance, and start publishing or consuming
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
/**
|
|
|
|
|
* Method that is called by the AMQP library when a fatal error occurs
|
|
|
|
|
* on the connection, for example because data received from RabbitMQ
|
2018-11-08 16:58:36 +08:00
|
|
|
* could not be recognized, or the underlying connection is lost. This
|
|
|
|
|
* call is normally followed by a call to onLost() (if the error occured
|
|
|
|
|
* after the TCP connection was established) and onDetached().
|
2015-11-01 17:43:45 +08:00
|
|
|
* @param connection The connection on which the error occured
|
|
|
|
|
* @param message A human readable error message
|
|
|
|
|
*/
|
2018-11-08 16:58:36 +08:00
|
|
|
virtual void onError(AMQP::TcpConnection *connection, const char *message) override
|
2015-11-01 17:43:45 +08:00
|
|
|
{
|
|
|
|
|
// @todo
|
|
|
|
|
// add your own implementation, for example by reporting the error
|
2018-11-08 16:58:36 +08:00
|
|
|
// to the user of your program and logging the error
|
2015-11-01 17:43:45 +08:00
|
|
|
}
|
|
|
|
|
|
|
|
|
|
/**
|
2018-11-08 16:58:36 +08:00
|
|
|
* Method that is called when the AMQP protocol is ended. This is the
|
|
|
|
|
* counter-part of a call to connection.close() to graceful shutdown
|
|
|
|
|
* the connection. Note that the TCP connection is at this time still
|
|
|
|
|
* active, and you will also receive calls to onLost() and onDetached()
|
|
|
|
|
* @param connection The connection over which the AMQP protocol ended
|
|
|
|
|
*/
|
|
|
|
|
virtual void onClosed(AMQP::TcpConnection *connection) override
|
|
|
|
|
{
|
|
|
|
|
// @todo
|
|
|
|
|
// add your own implementation (probably not necessary, but it could
|
|
|
|
|
// be useful if you want to do some something immediately after the
|
|
|
|
|
// amqp connection is over, but do not want to wait for the tcp
|
|
|
|
|
// connection to shut down
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
/**
|
|
|
|
|
* Method that is called when the TCP connection was closed or lost.
|
|
|
|
|
* This method is always called if there was also a call to onConnected()
|
2015-11-01 17:43:45 +08:00
|
|
|
* @param connection The connection that was closed and that is now unusable
|
|
|
|
|
*/
|
2018-11-08 16:58:36 +08:00
|
|
|
virtual void onLost(AMQP::TcpConnection *connection) override
|
|
|
|
|
{
|
|
|
|
|
// @todo
|
|
|
|
|
// add your own implementation (probably not necessary)
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
/**
|
|
|
|
|
* Final method that is called. This signals that no further calls to your
|
|
|
|
|
* handler will be made about the connection.
|
|
|
|
|
* @param connection The connection that can be destructed
|
|
|
|
|
*/
|
|
|
|
|
virtual void onDetached(AMQP::TcpConnection *connection) override
|
|
|
|
|
{
|
|
|
|
|
// @todo
|
|
|
|
|
// add your own implementation, like cleanup resources or exit the application
|
|
|
|
|
}
|
2015-11-01 17:43:45 +08:00
|
|
|
|
|
|
|
|
/**
|
|
|
|
|
* Method that is called by the AMQP-CPP library when it wants to interact
|
|
|
|
|
* with the main event loop. The AMQP-CPP library is completely non-blocking,
|
|
|
|
|
* and only make "write()" or "read()" system calls when it knows in advance
|
|
|
|
|
* that these calls will not block. To register a filedescriptor in the
|
|
|
|
|
* event loop, it calls this "monitor()" method with a filedescriptor and
|
2016-06-10 21:21:10 +08:00
|
|
|
* flags telling whether the filedescriptor should be checked for readability
|
2015-11-01 17:43:45 +08:00
|
|
|
* or writability.
|
|
|
|
|
*
|
|
|
|
|
* @param connection The connection that wants to interact with the event loop
|
|
|
|
|
* @param fd The filedescriptor that should be checked
|
|
|
|
|
* @param flags Bitwise or of AMQP::readable and/or AMQP::writable
|
|
|
|
|
*/
|
2018-11-08 16:58:36 +08:00
|
|
|
virtual void monitor(AMQP::TcpConnection *connection, int fd, int flags) override
|
2015-11-01 17:43:45 +08:00
|
|
|
{
|
|
|
|
|
// @todo
|
|
|
|
|
// add your own implementation, for example by adding the file
|
|
|
|
|
// descriptor to the main application event loop (like the select() or
|
|
|
|
|
// poll() loop). When the event loop reports that the descriptor becomes
|
|
|
|
|
// readable and/or writable, it is up to you to inform the AMQP-CPP
|
2018-01-27 06:50:41 +08:00
|
|
|
// library that the filedescriptor is active by calling the
|
2015-11-01 17:43:45 +08:00
|
|
|
// connection->process(fd, flags) method.
|
|
|
|
|
}
|
|
|
|
|
};
|
|
|
|
|
````
|
|
|
|
|
|
2018-11-08 17:46:31 +08:00
|
|
|
You see that there are many methods in TcpHandler that you can implement. The most important
|
2018-11-08 17:05:52 +08:00
|
|
|
one is "monitor()". This method is used to integrate the AMQP filedescriptors in your
|
2018-10-21 14:40:47 +08:00
|
|
|
application's event loop. For some popular event loops (libev, libuv, libevent), we
|
2018-11-08 17:05:52 +08:00
|
|
|
have already added example handler objects (see the next section for that). All the
|
2018-11-08 17:46:31 +08:00
|
|
|
other methods are optional to override. It often is a good idea to override the
|
2018-11-08 17:05:52 +08:00
|
|
|
onError() method to log or report errors and onDetached() for cleaning up stuff.
|
|
|
|
|
AMQP-CPP has it's own buffers if you send instructions prematurely, but if you
|
|
|
|
|
intend to send a lot of data over the connection, it also is a good idea to
|
|
|
|
|
implement the onReady() method and delay your calls until the AMQP connection
|
|
|
|
|
has been fully set up.
|
2015-11-17 16:52:03 +08:00
|
|
|
|
2018-01-27 06:50:41 +08:00
|
|
|
Using the TCP module of the AMQP-CPP library is easier than using the
|
|
|
|
|
raw AMQP::Connection and AMQP::Channel objects, because you do not have to
|
2015-11-01 17:43:45 +08:00
|
|
|
create the sockets and connections yourself, and you also do not have to take
|
2018-11-08 17:46:31 +08:00
|
|
|
care of buffering network data. The example that we gave above, looks slightly
|
|
|
|
|
different if you make use of the TCP module:
|
2015-11-01 17:43:45 +08:00
|
|
|
|
|
|
|
|
````c++
|
|
|
|
|
// create an instance of your own tcp handler
|
|
|
|
|
MyTcpHandler myHandler;
|
|
|
|
|
|
|
|
|
|
// address of the server
|
|
|
|
|
AMQP::Address address("amqp://guest:guest@localhost/vhost");
|
|
|
|
|
|
|
|
|
|
// create a AMQP connection object
|
|
|
|
|
AMQP::TcpConnection connection(&myHandler, address);
|
|
|
|
|
|
|
|
|
|
// and create a channel
|
|
|
|
|
AMQP::TcpChannel channel(&connection);
|
|
|
|
|
|
|
|
|
|
// use the channel object to call the AMQP method you like
|
|
|
|
|
channel.declareExchange("my-exchange", AMQP::fanout);
|
|
|
|
|
channel.declareQueue("my-queue");
|
2016-01-04 17:38:36 +08:00
|
|
|
channel.bindQueue("my-exchange", "my-queue", "my-routing-key");
|
2015-11-01 17:43:45 +08:00
|
|
|
````
|
|
|
|
|
|
2018-03-09 21:12:44 +08:00
|
|
|
SECURE CONNECTIONS
|
|
|
|
|
==================
|
|
|
|
|
|
|
|
|
|
The TCP module of AMQP-CPP also supports setting up secure connections. If your
|
|
|
|
|
RabbitMQ server accepts SSL connections, you can specify the address to your
|
|
|
|
|
server using the amqps:// protocol:
|
|
|
|
|
|
|
|
|
|
````c++
|
|
|
|
|
// init the SSL library (this works for openssl 1.1, for openssl 1.0 use SSL_library_init())
|
|
|
|
|
OPENSSL_init_ssl(0, NULL);
|
|
|
|
|
|
2018-03-10 07:59:14 +08:00
|
|
|
// address of the server (secure!)
|
2018-03-09 21:12:44 +08:00
|
|
|
AMQP::Address address("amqps://guest:guest@localhost/vhost");
|
|
|
|
|
|
|
|
|
|
// create a AMQP connection object
|
|
|
|
|
AMQP::TcpConnection connection(&myHandler, address);
|
|
|
|
|
````
|
|
|
|
|
|
|
|
|
|
There are two things to take care of if you want to create a secure connection:
|
2018-03-10 07:59:14 +08:00
|
|
|
(1) you must link your application with the -lssl flag (or use dlopen()), and (2)
|
|
|
|
|
you must initialize the openssl library by calling OPENSSL_init_ssl(). This
|
|
|
|
|
initializating must take place before you let you application connect to RabbitMQ.
|
|
|
|
|
This is necessary because AMQP-CPP needs access to the openssl library to set up
|
|
|
|
|
secure connections. It can only access this library if you have linked your
|
|
|
|
|
application with this library, or if you have loaded this library at runtime
|
|
|
|
|
using dlopen()).
|
|
|
|
|
|
2018-03-10 08:01:01 +08:00
|
|
|
Linking openssl is the normal thing to do. You just have to add the `-lssl` flag
|
|
|
|
|
to your linker. If you however do not want to link your application with openssl,
|
|
|
|
|
you can also load the openssl library at runtime, and pass in the pointer to the
|
|
|
|
|
handle to AMQP-CPP:
|
2018-03-10 07:59:14 +08:00
|
|
|
|
|
|
|
|
````c++
|
|
|
|
|
// dynamically open the openssl library
|
|
|
|
|
void *handle = dlopen("/path/to/openssl.so", RTLD_LAZY);
|
|
|
|
|
|
|
|
|
|
// tell AMQP-CPP library where the handle to openssl can be found
|
|
|
|
|
AMQP::openssl(handle);
|
|
|
|
|
|
|
|
|
|
// @todo call functions to initialize openssl, and create the AMQP connection
|
|
|
|
|
// (see exampe above)
|
|
|
|
|
````
|
|
|
|
|
|
|
|
|
|
By itself, AMQP-CPP does not check if the created TLS connection is sufficient
|
|
|
|
|
secure. Whether the certificate is expired, self-signed, missing or invalid: for
|
|
|
|
|
AMQP-CPP it all doesn't matter and the connection is simply permitted. If you
|
|
|
|
|
want to be more strict (for example: if you want to verify the server's certificate),
|
|
|
|
|
you must do this yourself by implementing the "onSecured()" method in your handler
|
|
|
|
|
object:
|
|
|
|
|
|
|
|
|
|
````c++
|
|
|
|
|
#include <amqpcpp.h>
|
2018-03-29 01:53:48 +08:00
|
|
|
#include <amqpcpp/linux_tcp.h>
|
2018-03-10 07:59:14 +08:00
|
|
|
|
|
|
|
|
class MyTcpHandler : public AMQP::TcpHandler
|
|
|
|
|
{
|
|
|
|
|
/**
|
|
|
|
|
* Method that is called right after the TLS connection has been created.
|
|
|
|
|
* In this method you can check the connection properties (like the certificate)
|
|
|
|
|
* and return false if you find it not secure enough
|
|
|
|
|
* @param connection the connection that has just completed the tls handshake
|
|
|
|
|
* @param ssl SSL structure from the openssl library
|
|
|
|
|
* @return bool true if connection is secure enough to start the AMQP protocol
|
|
|
|
|
*/
|
2018-11-08 17:46:31 +08:00
|
|
|
virtual bool onSecured(AMQP::TcpConnection *connection, const SSL *ssl) override
|
2018-03-10 07:59:14 +08:00
|
|
|
{
|
|
|
|
|
// @todo call functions from the openssl library to check the certificate,
|
|
|
|
|
// like SSL_get_peer_certificate() or SSL_get_verify_result().
|
|
|
|
|
// For now we always allow the connection to proceed
|
|
|
|
|
return true;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
/**
|
|
|
|
|
* All other methods (like onConnected(), onError(), etc) are left out of this
|
|
|
|
|
* example, but would be here if this was an actual user space handler class.
|
|
|
|
|
*/
|
|
|
|
|
};
|
|
|
|
|
````
|
|
|
|
|
|
|
|
|
|
The SSL pointer that is passed to the onSecured() method refers to the "SSL"
|
|
|
|
|
structure from the openssl library.
|
2018-03-09 21:12:44 +08:00
|
|
|
|
|
|
|
|
|
2015-11-17 16:52:03 +08:00
|
|
|
EXISTING EVENT LOOPS
|
|
|
|
|
====================
|
|
|
|
|
|
|
|
|
|
Both the pure AMQP::Connection as well as the easier AMQP::TcpConnection class
|
|
|
|
|
allow you to integrate AMQP-CPP in your own event loop. Whether you take care
|
2018-01-27 06:50:41 +08:00
|
|
|
of running the event loop yourself (for example by using the select() system
|
|
|
|
|
call), or if you use an existing library for it (like libevent, libev or libuv),
|
|
|
|
|
you can implement the "monitor()" method to watch the file descriptors and
|
2015-11-17 16:52:03 +08:00
|
|
|
hand over control back to AMQP-CPP when one of the sockets become active.
|
|
|
|
|
|
2018-10-21 14:40:47 +08:00
|
|
|
For libev, libuv and libevent users, we have even implemented an example
|
|
|
|
|
implementation, so that you do not even have to do this. Instead of implementing
|
|
|
|
|
the monitor() method yourself, you can use the AMQP::LibEvHandler,
|
|
|
|
|
AMQP::LibUvHandler or AMQP:LibEventHandler classes instead:
|
2015-11-17 16:52:03 +08:00
|
|
|
|
|
|
|
|
````c++
|
|
|
|
|
#include <ev.h>
|
|
|
|
|
#include <amqpcpp.h>
|
|
|
|
|
#include <amqpcpp/libev.h>
|
|
|
|
|
|
|
|
|
|
int main()
|
|
|
|
|
{
|
|
|
|
|
// access to the event loop
|
|
|
|
|
auto *loop = EV_DEFAULT;
|
2018-01-27 06:50:41 +08:00
|
|
|
|
2015-11-17 16:52:03 +08:00
|
|
|
// handler for libev (so we don't have to implement AMQP::TcpHandler!)
|
|
|
|
|
AMQP::LibEvHandler handler(loop);
|
2018-01-27 06:50:41 +08:00
|
|
|
|
2015-11-17 16:52:03 +08:00
|
|
|
// make a connection
|
|
|
|
|
AMQP::TcpConnection connection(&handler, AMQP::Address("amqp://localhost/"));
|
2018-01-27 06:50:41 +08:00
|
|
|
|
2015-11-17 16:52:03 +08:00
|
|
|
// we need a channel too
|
|
|
|
|
AMQP::TcpChannel channel(&connection);
|
2018-01-27 06:50:41 +08:00
|
|
|
|
2015-11-17 16:52:03 +08:00
|
|
|
// create a temporary queue
|
|
|
|
|
channel.declareQueue(AMQP::exclusive).onSuccess([&connection](const std::string &name, uint32_t messagecount, uint32_t consumercount) {
|
2018-01-27 06:50:41 +08:00
|
|
|
|
2015-11-17 16:52:03 +08:00
|
|
|
// report the name of the temporary queue
|
|
|
|
|
std::cout << "declared queue " << name << std::endl;
|
2018-01-27 06:50:41 +08:00
|
|
|
|
2015-11-17 16:52:03 +08:00
|
|
|
// now we can close the connection
|
|
|
|
|
connection.close();
|
|
|
|
|
});
|
2018-01-27 06:50:41 +08:00
|
|
|
|
2015-11-17 16:52:03 +08:00
|
|
|
// run the loop
|
|
|
|
|
ev_run(loop, 0);
|
|
|
|
|
|
|
|
|
|
// done
|
|
|
|
|
return 0;
|
|
|
|
|
}
|
|
|
|
|
````
|
|
|
|
|
|
2016-01-10 05:17:03 +08:00
|
|
|
The AMQP::LibEvHandler and AMQP::LibEventHandler classes are extended AMQP::TcpHandler
|
|
|
|
|
classes, with an implementation of the monitor() method that simply adds the
|
2018-10-21 14:40:47 +08:00
|
|
|
filedescriptor to the event loop. If you use this class, it is recommended not to
|
2015-11-17 16:52:03 +08:00
|
|
|
instantiate it directly (like we did in the example), but to create your own
|
2018-01-27 06:50:41 +08:00
|
|
|
"MyHandler" class that extends from it, and in which you also implement the
|
2015-11-17 16:52:03 +08:00
|
|
|
onError() method to report possible connection errors to your end users.
|
|
|
|
|
|
2018-10-21 14:40:47 +08:00
|
|
|
Currently, we have example TcpHandler implementations for libev, libuv,
|
|
|
|
|
libevent, and Boost's asio. For other event loops we do not yet have
|
|
|
|
|
such examples. The quality of the libboostasio is however debatable: it was
|
|
|
|
|
not developed and is not maintained by the original AMQP-CPP developers, and
|
|
|
|
|
it has a couple of open issues.
|
|
|
|
|
|
|
|
|
|
| TCP Handler Impl | Header File Location | Sample File Location |
|
|
|
|
|
| ----------------------- | ---------------------- | ------------------------- |
|
|
|
|
|
| Boost asio (io_service) | include/libboostasio.h | examples/libboostasio.cpp |
|
|
|
|
|
| libev | include/libev.h | examples/libev.cpp |
|
|
|
|
|
| libevent | include/libevent.h | examples/libevent.cpp |
|
|
|
|
|
| libuv | include/libuv.h | examples/libuv.cpp |
|
2015-11-17 16:52:03 +08:00
|
|
|
|
2017-06-16 18:17:13 +08:00
|
|
|
HEARTBEATS
|
|
|
|
|
==========
|
|
|
|
|
|
2018-01-27 06:50:41 +08:00
|
|
|
The AMQP protocol supports *heartbeats*. If this heartbeat feature is enabled, the
|
|
|
|
|
client and the server negotiate a heartbeat interval during connection setup, and
|
|
|
|
|
they agree to send at least *some kind of data* over the connection during every
|
|
|
|
|
iteration of that interval. The normal data that is sent over the connection (like
|
|
|
|
|
publishing or consuming messages) is normally sufficient to keep the connection alive,
|
2017-06-16 18:17:13 +08:00
|
|
|
but if the client or server was idle during the negotiated interval time, a dummy
|
|
|
|
|
heartbeat message must be sent instead.
|
|
|
|
|
|
2018-01-27 06:50:41 +08:00
|
|
|
The default behavior of the AMQP-CPP library is to disable heartbeats. The
|
2017-06-16 18:17:13 +08:00
|
|
|
proposed heartbeat interval of the server during connection setup (the server
|
|
|
|
|
normally suggests an interval of 60 seconds) is vetoed by the AMQP-CPP library so
|
|
|
|
|
no heartbeats are ever needed to be sent over the connection. This means that you
|
|
|
|
|
can safely keep your AMQP connection idle for as long as you like, and/or run long
|
|
|
|
|
lasting algorithms after you've consumed a message from RabbitMQ, without having
|
|
|
|
|
to worry about the connection being idle for too long.
|
|
|
|
|
|
|
|
|
|
You can however choose to enable these heartbeats. If you want to enable heartbeats,
|
2018-11-09 00:50:52 +08:00
|
|
|
you should implement the onNegotiate() method inside your ConnectionHandler or
|
2017-06-16 18:17:13 +08:00
|
|
|
TcpHandler class and have it return the interval that you find appropriate.
|
|
|
|
|
|
|
|
|
|
````c++
|
|
|
|
|
#include <amqpcpp.h>
|
|
|
|
|
|
|
|
|
|
class MyTcpHandler : public AMQP::TcpHandler
|
|
|
|
|
{
|
|
|
|
|
/**
|
|
|
|
|
* Method that is called when the server tries to negotiate a heartbeat
|
|
|
|
|
* interval, and that is overridden to get rid of the default implementation
|
|
|
|
|
* (which vetoes the suggested heartbeat interval), and accept the interval
|
|
|
|
|
* instead.
|
|
|
|
|
* @param connection The connection on which the error occured
|
|
|
|
|
* @param interval The suggested interval in seconds
|
|
|
|
|
*/
|
2018-04-11 22:02:50 +08:00
|
|
|
virtual uint16_t onNegotiate(AMQP::TcpConnection *connection, uint16_t interval)
|
2017-06-16 18:17:13 +08:00
|
|
|
{
|
2018-01-27 06:50:41 +08:00
|
|
|
// we accept the suggestion from the server, but if the interval is smaller
|
2017-06-16 18:17:13 +08:00
|
|
|
// that one minute, we will use a one minute interval instead
|
|
|
|
|
if (interval < 60) interval = 60;
|
|
|
|
|
|
|
|
|
|
// @todo
|
|
|
|
|
// set a timer in your event loop, and make sure that you call
|
2018-11-09 00:50:52 +08:00
|
|
|
// connection->heartbeat() every _interval_ seconds if no other
|
|
|
|
|
// instruction was sent in that period.
|
2018-01-27 06:50:41 +08:00
|
|
|
|
2017-06-16 18:17:13 +08:00
|
|
|
// return the interval that we want to use
|
|
|
|
|
return interval;
|
|
|
|
|
}
|
|
|
|
|
};
|
|
|
|
|
````
|
|
|
|
|
|
2018-11-09 00:50:52 +08:00
|
|
|
If you enable heartbeats, it is your own responsibility to ensure that the
|
2017-06-16 18:17:13 +08:00
|
|
|
```connection->heartbeat()``` method is called at least once during this period,
|
|
|
|
|
or that you call one of the other channel or connection methods to send data
|
2018-11-09 00:50:52 +08:00
|
|
|
over the connection. Heartbeats are sent by the server too, RabbitMQ also ensures
|
2018-11-09 00:52:03 +08:00
|
|
|
that _some data_ is sent over the connection from the server to the client
|
|
|
|
|
during the heartbeat interval. It is also your responnsibility to shutdown
|
|
|
|
|
the connection if you find out that the server stops sending data during
|
|
|
|
|
this period.
|
2017-06-16 18:17:13 +08:00
|
|
|
|
2018-11-09 00:50:52 +08:00
|
|
|
If you use the AMQP::LibEvHandler event loop implementation, heartbeats are
|
|
|
|
|
enabled by default, and all these checks are automatically performed.
|
2017-06-16 18:17:13 +08:00
|
|
|
|
|
|
|
|
|
2014-01-07 01:19:49 +08:00
|
|
|
CHANNELS
|
|
|
|
|
========
|
|
|
|
|
|
2018-01-27 06:50:41 +08:00
|
|
|
In the above example we created a channel object. A channel is a sort of virtual
|
2015-11-01 17:43:45 +08:00
|
|
|
connection, and it is possible to create many channels that all use
|
|
|
|
|
the same connection.
|
2014-01-07 01:19:49 +08:00
|
|
|
|
2014-04-10 19:50:24 +08:00
|
|
|
AMQP instructions are always sent over a channel, so before you can send the first
|
|
|
|
|
command to the RabbitMQ server, you first need a channel object. The channel
|
2014-01-07 01:19:49 +08:00
|
|
|
object has many methods to send instructions to the RabbitMQ server. It for
|
2014-04-10 19:50:24 +08:00
|
|
|
example has methods to declare queues and exchanges, to bind and unbind them,
|
|
|
|
|
and to publish and consume messages. You can best take a look at the channel.h
|
|
|
|
|
C++ header file for a list of all available methods. Every method in it is well
|
2014-01-07 01:19:49 +08:00
|
|
|
documented.
|
2014-01-05 01:20:45 +08:00
|
|
|
|
2015-11-01 17:43:45 +08:00
|
|
|
All operations that you can perform on a channel are non-blocking. This means
|
|
|
|
|
that it is not possible for a method (like Channel::declareExchange()) to
|
2018-01-27 06:50:41 +08:00
|
|
|
immediately return 'true' or 'false'. Instead, almost every method of the Channel
|
|
|
|
|
class returns an instance of the 'Deferred' class. This 'Deferred' object can be
|
2015-11-01 17:43:45 +08:00
|
|
|
used to install handlers that will be called in case of success or failure.
|
2014-01-07 01:19:49 +08:00
|
|
|
|
2014-04-29 22:07:43 +08:00
|
|
|
For example, if you call the channel.declareExchange() method, the AMQP-CPP library
|
2014-04-15 20:22:30 +08:00
|
|
|
will send a message to the RabbitMQ message broker to ask it to declare the
|
2014-01-07 01:19:49 +08:00
|
|
|
queue. However, because all operations in the library are asynchronous, the
|
2014-04-15 20:22:30 +08:00
|
|
|
declareExchange() method can not return 'true' or 'false' to inform you whether
|
2014-04-29 22:07:43 +08:00
|
|
|
the operation was succesful or not. Only after a while, after the instruction
|
|
|
|
|
has reached the RabbitMQ server, and the confirmation from the server has been
|
2014-04-15 20:22:30 +08:00
|
|
|
sent back to the client, the library can report the result of the declareExchange()
|
|
|
|
|
call.
|
|
|
|
|
|
2014-04-29 22:07:43 +08:00
|
|
|
To prevent any blocking calls, the channel.declareExchange() method returns a
|
|
|
|
|
'Deferred' result object, on which you can set callback functions that will be
|
2014-04-15 20:22:30 +08:00
|
|
|
called when the operation succeeds or fails.
|
2014-04-10 19:50:24 +08:00
|
|
|
|
|
|
|
|
````c++
|
2015-11-01 17:43:45 +08:00
|
|
|
// create a channel (or use TcpChannel if you're using the Tcp module)
|
2014-04-10 19:50:24 +08:00
|
|
|
Channel myChannel(&connection);
|
2014-04-15 20:22:30 +08:00
|
|
|
|
|
|
|
|
// declare an exchange, and install callbacks for success and failure
|
|
|
|
|
myChannel.declareExchange("my-exchange")
|
|
|
|
|
|
|
|
|
|
.onSuccess([]() {
|
|
|
|
|
// by now the exchange is created
|
|
|
|
|
})
|
|
|
|
|
|
|
|
|
|
.onError([](const char *message) {
|
|
|
|
|
// something went wrong creating the exchange
|
|
|
|
|
});
|
|
|
|
|
````
|
|
|
|
|
|
|
|
|
|
As you can see in the above example, we call the declareExchange() method, and
|
|
|
|
|
treat its return value as an object, on which we immediately install a lambda
|
|
|
|
|
callback function to handle success, and to handle failure.
|
|
|
|
|
|
|
|
|
|
Installing the callback methods is optional. If you're not interested in the
|
|
|
|
|
result of an operation, you do not have to install a callback for it. Next
|
|
|
|
|
to the onSuccess() and onError() callbacks that can be installed, you can also
|
|
|
|
|
install a onFinalize() method that gets called directly after the onSuccess()
|
|
|
|
|
and onError() methods, and that can be used to set a callback that should
|
|
|
|
|
run in either case: when the operation succeeds or when it fails.
|
|
|
|
|
|
|
|
|
|
The signature for the onError() method is always the same: it gets one parameter
|
|
|
|
|
with a human readable error message. The onSuccess() function has a different
|
2014-04-29 22:07:43 +08:00
|
|
|
signature depending on the method that you call. Most onSuccess() functions
|
|
|
|
|
(like the one we showed for the declareExchange() method) do not get any
|
|
|
|
|
parameters at all. Some specific onSuccess callbacks receive extra parameters
|
2014-04-15 20:22:30 +08:00
|
|
|
with additional information.
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
CHANNEL CALLBACKS
|
|
|
|
|
=================
|
|
|
|
|
|
|
|
|
|
As explained, most channel methods return a 'Deferred' object on which you can
|
|
|
|
|
install callbacks using the Deferred::onError() and Deferred::onSuccess() methods.
|
|
|
|
|
|
|
|
|
|
The callbacks that you install on a Deferred object, only apply to one specific
|
|
|
|
|
operation. If you want to install a generic error callback for the entire channel,
|
|
|
|
|
you can so so by using the Channel::onError() method. Next to the Channel::onError()
|
|
|
|
|
method, you can also install a callback to be notified when the channel is ready
|
|
|
|
|
for sending the first instruction to RabbitMQ.
|
|
|
|
|
|
|
|
|
|
````c++
|
2015-11-01 17:43:45 +08:00
|
|
|
// create a channel (or use TcpChannel if you use the Tcp module)
|
2014-04-15 20:27:16 +08:00
|
|
|
Channel myChannel(&connection);
|
2014-04-15 20:22:30 +08:00
|
|
|
|
|
|
|
|
// install a generic channel-error handler that will be called for every
|
|
|
|
|
// error that occurs on the channel
|
|
|
|
|
myChannel.onError([](const char *message) {
|
|
|
|
|
|
|
|
|
|
// report error
|
|
|
|
|
std::cout << "channel error: " << message << std::endl;
|
|
|
|
|
});
|
|
|
|
|
|
|
|
|
|
// install a generic callback that will be called when the channel is ready
|
|
|
|
|
// for sending the first instruction
|
|
|
|
|
myChannel.onReady([]() {
|
|
|
|
|
|
|
|
|
|
// send the first instructions (like publishing messages)
|
2014-04-10 19:50:24 +08:00
|
|
|
});
|
|
|
|
|
````
|
2014-01-07 01:19:49 +08:00
|
|
|
|
2014-04-29 22:07:43 +08:00
|
|
|
In theory, you should wait for the onReady() callback to be called before you
|
2014-04-15 20:33:40 +08:00
|
|
|
send any other instructions over the channel. In practice however, the AMQP library
|
2014-04-15 20:22:30 +08:00
|
|
|
caches all instructions that were sent too early, so that you can use the
|
|
|
|
|
channel object right after it was constructed.
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
CHANNEL ERRORS
|
|
|
|
|
==============
|
|
|
|
|
|
2018-01-27 06:50:41 +08:00
|
|
|
It is important to realize that any error that occurs on a channel,
|
2015-11-02 16:40:07 +08:00
|
|
|
invalidates the entire channel, including all subsequent instructions that
|
2014-01-07 04:40:48 +08:00
|
|
|
were already sent over it. This means that if you call multiple methods in a row,
|
2014-01-07 01:19:49 +08:00
|
|
|
and the first method fails, all subsequent methods will not be executed either:
|
|
|
|
|
|
|
|
|
|
````c++
|
2014-04-15 20:27:16 +08:00
|
|
|
Channel myChannel(&connection);
|
2014-01-07 04:40:48 +08:00
|
|
|
myChannel.declareQueue("my-queue");
|
|
|
|
|
myChannel.declareExchange("my-exchange");
|
2014-01-07 01:19:49 +08:00
|
|
|
````
|
2014-01-05 01:20:45 +08:00
|
|
|
|
2014-04-29 22:07:43 +08:00
|
|
|
If the first declareQueue() call fails in the example above, the second
|
2014-04-15 20:22:30 +08:00
|
|
|
myChannel.declareExchange() method will not be executed, even when this
|
|
|
|
|
second instruction was already sent to the server. The second instruction will be
|
2014-04-29 22:07:43 +08:00
|
|
|
ignored by the RabbitMQ server because the channel was already in an invalid
|
2014-04-15 20:22:30 +08:00
|
|
|
state after the first failure.
|
2014-01-07 04:40:48 +08:00
|
|
|
|
|
|
|
|
You can overcome this by using multiple channels:
|
2014-01-07 01:19:49 +08:00
|
|
|
|
|
|
|
|
````c++
|
2014-04-15 20:27:16 +08:00
|
|
|
Channel channel1(&connection);
|
|
|
|
|
Channel channel2(&connection);
|
2014-01-07 04:40:48 +08:00
|
|
|
channel1.declareQueue("my-queue");
|
2014-04-10 19:50:24 +08:00
|
|
|
channel2.declareExchange("my-exchange");
|
2014-01-07 01:19:49 +08:00
|
|
|
````
|
|
|
|
|
|
2014-04-29 22:07:43 +08:00
|
|
|
Now, if an error occurs with declaring the queue, it will not have consequences
|
|
|
|
|
for the other call. But this comes at a small price: setting up the extra channel
|
|
|
|
|
requires and extra instruction to be sent to the RabbitMQ server, so some extra
|
|
|
|
|
bytes are sent over the network, and some additional resources in both the client
|
2014-04-15 20:22:30 +08:00
|
|
|
application and the RabbitMQ server are used (although this is all very limited).
|
|
|
|
|
|
2018-01-27 06:50:41 +08:00
|
|
|
If possible, it is best to make use of this feature. For example, if you have an important AMQP
|
2015-11-01 17:43:45 +08:00
|
|
|
connection that you use for consuming messages, and at the same time you want
|
2015-11-02 16:40:07 +08:00
|
|
|
to send another instruction to RabbitMQ (like declaring a temporary queue), it is
|
|
|
|
|
best to set up a new channel for this 'declare' instruction. If the declare fails,
|
2015-11-01 17:43:45 +08:00
|
|
|
it will not stop the consumer, because it was sent over a different channel.
|
|
|
|
|
|
2015-11-02 16:40:07 +08:00
|
|
|
The AMQP-CPP library allows you to create channels on the stack. It is not
|
2015-11-01 17:43:45 +08:00
|
|
|
a problem if a channel object gets destructed before the instruction was received by
|
|
|
|
|
the RabbitMQ server:
|
|
|
|
|
|
|
|
|
|
````c++
|
|
|
|
|
void myDeclareMethod(AMQP::Connection *connection)
|
|
|
|
|
{
|
|
|
|
|
// create temporary channel to declare a queue
|
|
|
|
|
AMQP::Channel channel(connection);
|
2018-01-27 06:50:41 +08:00
|
|
|
|
2015-11-02 16:40:07 +08:00
|
|
|
// declare the queue (the channel object is destructed before the
|
|
|
|
|
// instruction reaches the server, but the AMQP-CPP library can deal
|
|
|
|
|
// with this)
|
2015-11-01 17:43:45 +08:00
|
|
|
channel.declareQueue("my-new-queue");
|
|
|
|
|
}
|
|
|
|
|
````
|
|
|
|
|
|
2014-01-07 01:19:49 +08:00
|
|
|
|
|
|
|
|
FLAGS AND TABLES
|
|
|
|
|
================
|
|
|
|
|
|
2014-01-07 04:40:48 +08:00
|
|
|
Let's take a closer look at one method in the Channel object to explain
|
2014-01-07 15:48:37 +08:00
|
|
|
two other concepts of this AMQP-CPP library: flags and tables. The method that we
|
2014-01-07 05:14:59 +08:00
|
|
|
will be looking at is the Channel::declareQueue() method - but we could've
|
|
|
|
|
picked a different method too because flags and
|
|
|
|
|
tables are used by many methods.
|
2014-01-05 01:20:45 +08:00
|
|
|
|
|
|
|
|
````c++
|
|
|
|
|
/**
|
|
|
|
|
* Declare a queue
|
2014-04-10 19:50:24 +08:00
|
|
|
*
|
2014-01-05 01:20:45 +08:00
|
|
|
* If you do not supply a name, a name will be assigned by the server.
|
2014-04-10 19:50:24 +08:00
|
|
|
*
|
2014-01-05 01:20:45 +08:00
|
|
|
* The flags can be a combination of the following values:
|
2014-04-10 19:50:24 +08:00
|
|
|
*
|
2014-01-05 01:20:45 +08:00
|
|
|
* - durable queue survives a broker restart
|
|
|
|
|
* - autodelete queue is automatically removed when all connected consumers are gone
|
|
|
|
|
* - passive only check if the queue exist
|
|
|
|
|
* - exclusive the queue only exists for this connection, and is automatically removed when connection is gone
|
2014-04-10 19:50:24 +08:00
|
|
|
*
|
2014-01-05 01:20:45 +08:00
|
|
|
* @param name name of the queue
|
|
|
|
|
* @param flags combination of flags
|
|
|
|
|
* @param arguments optional arguments
|
|
|
|
|
*/
|
2014-04-15 20:22:30 +08:00
|
|
|
DeferredQueue &declareQueue(const std::string &name, int flags, const Table &arguments);
|
|
|
|
|
DeferredQueue &declareQueue(const std::string &name, const Table &arguments);
|
|
|
|
|
DeferredQueue &declareQueue(const std::string &name, int flags = 0);
|
|
|
|
|
DeferredQueue &declareQueue(int flags, const Table &arguments);
|
|
|
|
|
DeferredQueue &declareQueue(const Table &arguments);
|
|
|
|
|
DeferredQueue &declareQueue(int flags = 0);
|
2014-01-05 01:20:45 +08:00
|
|
|
````
|
|
|
|
|
|
2014-01-07 18:25:23 +08:00
|
|
|
As you can see, the method comes in many forms, and it is up to you to choose
|
2014-04-10 19:50:24 +08:00
|
|
|
the one that is most appropriate. We now take a look at the most complete
|
2014-01-07 18:25:23 +08:00
|
|
|
one, the method with three parameters.
|
|
|
|
|
|
2014-04-15 20:22:30 +08:00
|
|
|
All above methods returns a 'DeferredQueue' object. The DeferredQueue class
|
|
|
|
|
extends from the AMQP::Deferred class and allows you to install a more powerful
|
|
|
|
|
onSuccess() callback function. The 'onSuccess' method for the declareQueue()
|
|
|
|
|
function gets three arguments:
|
|
|
|
|
|
|
|
|
|
````c++
|
|
|
|
|
// create a custom callback
|
|
|
|
|
auto callback = [](const std::string &name, int msgcount, int consumercount) {
|
|
|
|
|
|
|
|
|
|
// @todo add your own implementation
|
|
|
|
|
|
2017-03-12 00:51:54 +08:00
|
|
|
};
|
2014-04-15 20:22:30 +08:00
|
|
|
|
|
|
|
|
// declare the queue, and install the callback that is called on success
|
|
|
|
|
channel.declareQueue("myQueue").onSuccess(callback);
|
|
|
|
|
````
|
|
|
|
|
|
|
|
|
|
Just like many others methods in the Channel class, the declareQueue() method
|
2015-11-02 16:40:07 +08:00
|
|
|
accepts an integer parameter named 'flags'. This is a variable in which you can
|
2014-04-29 22:07:43 +08:00
|
|
|
set method-specific options, by summing up all the options that are described in
|
|
|
|
|
the documentation above the method. If you for example want to create a durable,
|
2014-04-15 20:22:30 +08:00
|
|
|
auto-deleted queue, you can pass in the value AMQP::durable + AMQP::autodelete.
|
2014-01-05 01:20:45 +08:00
|
|
|
|
2014-01-07 04:40:48 +08:00
|
|
|
The declareQueue() method also accepts a parameter named 'arguments', which is of type
|
2014-01-07 01:19:49 +08:00
|
|
|
Table. This Table object can be used as an associative array to send additional
|
2014-04-10 19:50:24 +08:00
|
|
|
options to RabbitMQ, that are often custom RabbitMQ extensions to the AMQP
|
2014-01-07 04:40:48 +08:00
|
|
|
standard. For a list of all supported arguments, take a look at the documentation
|
|
|
|
|
on the RabbitMQ website. With every new RabbitMQ release more features, and
|
|
|
|
|
supported arguments are added.
|
|
|
|
|
|
2014-04-10 19:50:24 +08:00
|
|
|
The Table class is a very powerful class that enables you to build
|
|
|
|
|
complicated, deeply nested structures full of strings, arrays and even other
|
2014-01-07 04:40:48 +08:00
|
|
|
tables. In reality, you only need strings and integers.
|
2014-01-05 01:20:45 +08:00
|
|
|
|
|
|
|
|
````c++
|
|
|
|
|
// custom options that are passed to the declareQueue call
|
2014-01-07 04:40:48 +08:00
|
|
|
AMQP::Table arguments;
|
2014-01-06 15:28:37 +08:00
|
|
|
arguments["x-dead-letter-exchange"] = "some-exchange";
|
2014-01-05 01:42:58 +08:00
|
|
|
arguments["x-message-ttl"] = 3600 * 1000;
|
|
|
|
|
arguments["x-expires"] = 7200 * 1000;
|
2014-01-05 01:20:45 +08:00
|
|
|
|
|
|
|
|
// declare the queue
|
2014-01-05 01:42:58 +08:00
|
|
|
channel.declareQueue("my-queue-name", AMQP::durable + AMQP::autodelete, arguments);
|
2014-01-05 01:20:45 +08:00
|
|
|
````
|
2014-01-04 19:38:24 +08:00
|
|
|
|
2015-11-02 16:40:07 +08:00
|
|
|
|
2014-01-07 01:19:49 +08:00
|
|
|
PUBLISHING MESSAGES
|
|
|
|
|
===================
|
|
|
|
|
|
2014-01-07 04:40:48 +08:00
|
|
|
Publishing messages is easy, and the Channel class has a list of methods that
|
|
|
|
|
can all be used for it. The most simple one takes three arguments: the name of the
|
|
|
|
|
exchange to publish to, the routing key to use, and the actual message that
|
|
|
|
|
you're publishing - all these parameters are standard C++ strings.
|
|
|
|
|
|
2014-01-07 05:24:14 +08:00
|
|
|
More extended versions of the publish() method exist that accept additional
|
2014-04-29 22:07:43 +08:00
|
|
|
arguments, and that enable you to publish entire Envelope objects. An envelope
|
2015-11-02 16:49:04 +08:00
|
|
|
is an object that contains the message plus a list of optional meta properties like
|
2014-04-10 19:50:24 +08:00
|
|
|
the content-type, content-encoding, priority, expire time and more. None of these
|
|
|
|
|
meta fields are interpreted by this library, and also the RabbitMQ ignores most
|
|
|
|
|
of them, but the AMQP protocol defines them, and they are free for you to use.
|
|
|
|
|
For an extensive list of the fields that are supported, take a look at the MetaData.h
|
|
|
|
|
header file (MetaData is the base class for Envelope). You should also check the
|
|
|
|
|
RabbitMQ documentation to find out if an envelope header is interpreted by the
|
2014-01-07 06:13:55 +08:00
|
|
|
RabbitMQ server (at the time of this writing, only the expire time is being used).
|
2014-01-07 04:40:48 +08:00
|
|
|
|
|
|
|
|
The following snippet is copied from the Channel.h header file and lists all
|
|
|
|
|
available publish() methods. As you can see, you can call the publish() method
|
|
|
|
|
in almost any form:
|
|
|
|
|
|
|
|
|
|
````c++
|
|
|
|
|
/**
|
|
|
|
|
* Publish a message to an exchange
|
2018-03-02 06:27:20 +08:00
|
|
|
*
|
2020-10-19 14:59:13 +08:00
|
|
|
* You have to supply the name of an exchange and a routing key. RabbitMQ will then try
|
|
|
|
|
* to send the message to one or more queues. With the optional flags parameter you can
|
|
|
|
|
* specify what should happen if the message could not be routed to a queue. By default,
|
|
|
|
|
* unroutable message are silently discarded.
|
2018-03-02 06:27:20 +08:00
|
|
|
*
|
2020-10-19 14:59:13 +08:00
|
|
|
* If you set the 'mandatory' and/or 'immediate' flag, messages that could not be handled
|
|
|
|
|
* are returned to the application. Make sure that you have called the recall()-method and
|
|
|
|
|
* have set up all appropriate handlers to process these returned messages before you start
|
|
|
|
|
* publishing.
|
2018-03-02 06:27:20 +08:00
|
|
|
*
|
|
|
|
|
* The following flags can be supplied:
|
|
|
|
|
*
|
|
|
|
|
* - mandatory If set, server returns messages that are not sent to a queue
|
|
|
|
|
* - immediate If set, server returns messages that can not immediately be forwarded to a consumer.
|
|
|
|
|
*
|
2014-01-07 04:40:48 +08:00
|
|
|
* @param exchange the exchange to publish to
|
|
|
|
|
* @param routingkey the routing key
|
|
|
|
|
* @param envelope the full envelope to send
|
|
|
|
|
* @param message the message to send
|
|
|
|
|
* @param size size of the message
|
2018-03-02 06:27:20 +08:00
|
|
|
* @param flags optional flags
|
2014-01-07 04:40:48 +08:00
|
|
|
*/
|
2020-10-19 14:59:13 +08:00
|
|
|
bool publish(const std::string &exchange, const std::string &routingKey, const Envelope &envelope, int flags = 0) { return _implementation->publish(exchange, routingKey, envelope, flags); }
|
|
|
|
|
bool publish(const std::string &exchange, const std::string &routingKey, const std::string &message, int flags = 0) { return _implementation->publish(exchange, routingKey, Envelope(message.data(), message.size()), flags); }
|
|
|
|
|
bool publish(const std::string &exchange, const std::string &routingKey, const char *message, size_t size, int flags = 0) { return _implementation->publish(exchange, routingKey, Envelope(message, size), flags); }
|
|
|
|
|
bool publish(const std::string &exchange, const std::string &routingKey, const char *message, int flags = 0) { return _implementation->publish(exchange, routingKey, Envelope(message, strlen(message)), flags); }
|
2014-01-07 04:40:48 +08:00
|
|
|
````
|
|
|
|
|
|
2014-04-15 20:22:30 +08:00
|
|
|
Published messages are normally not confirmed by the server, and the RabbitMQ
|
2018-10-11 04:25:49 +08:00
|
|
|
will not send a report back to inform you whether the message was successfully
|
2018-03-02 06:27:20 +08:00
|
|
|
published or not. But with the flags you can instruct RabbitMQ to send back
|
2020-10-19 14:59:13 +08:00
|
|
|
the message if it was undeliverable. In you use these flags you must also install
|
|
|
|
|
callbacks that will process these bounced messages.
|
2014-01-07 04:40:48 +08:00
|
|
|
|
2018-03-02 06:27:20 +08:00
|
|
|
You can also use transactions to ensure that your messages get delivered.
|
|
|
|
|
Let's say that you are publishing many messages in a row. If you get
|
2014-04-29 22:07:43 +08:00
|
|
|
an error halfway through there is no way to know for sure how many messages made
|
|
|
|
|
it to the broker and how many should be republished. If this is important, you can
|
2014-04-15 20:22:30 +08:00
|
|
|
wrap the publish commands inside a transaction. In this case, if an error occurs,
|
2014-04-29 22:07:43 +08:00
|
|
|
the transaction is automatically rolled back by RabbitMQ and none of the messages
|
2014-04-15 20:22:30 +08:00
|
|
|
are actually published.
|
2014-04-10 19:50:24 +08:00
|
|
|
|
|
|
|
|
````c++
|
2014-04-15 20:22:30 +08:00
|
|
|
// start a transaction
|
2014-04-10 19:50:24 +08:00
|
|
|
channel.startTransaction();
|
2014-04-15 20:22:30 +08:00
|
|
|
|
|
|
|
|
// publish a number of messages
|
2014-04-10 19:50:24 +08:00
|
|
|
channel.publish("my-exchange", "my-key", "my first message");
|
|
|
|
|
channel.publish("my-exchange", "my-key", "another message");
|
2014-04-15 20:22:30 +08:00
|
|
|
|
|
|
|
|
// commit the transactions, and set up callbacks that are called when
|
|
|
|
|
// the transaction was successful or not
|
2014-04-10 19:50:24 +08:00
|
|
|
channel.commitTransaction()
|
2014-04-15 20:22:30 +08:00
|
|
|
.onSuccess([]() {
|
|
|
|
|
// all messages were successfully published
|
|
|
|
|
})
|
2017-05-17 15:22:24 +08:00
|
|
|
.onError([](const char *message) {
|
2014-04-15 20:22:30 +08:00
|
|
|
// none of the messages were published
|
|
|
|
|
// now we have to do it all over again
|
|
|
|
|
});
|
2014-04-10 19:50:24 +08:00
|
|
|
````
|
2014-01-07 04:40:48 +08:00
|
|
|
|
2018-01-27 06:50:41 +08:00
|
|
|
Note that AMQP transactions are not as powerful as transactions that are
|
|
|
|
|
knows in the database world. It is not possible to wrap all sort of
|
2015-11-02 16:49:04 +08:00
|
|
|
operations in a transaction, they are only meaningful for publishing
|
|
|
|
|
and consuming.
|
|
|
|
|
|
2017-06-09 06:07:29 +08:00
|
|
|
PUBLISHER CONFIRMS
|
|
|
|
|
===================
|
|
|
|
|
|
2020-10-23 14:14:26 +08:00
|
|
|
RabbitMQ supports a lightweight method of confirming that broker received
|
|
|
|
|
and processed a message. When you enable this, RabbitMQ sends back an
|
|
|
|
|
'ack' for each publish-operation that has been handled. For this method
|
|
|
|
|
to work, the channel needs to be put in _confirm mode_. This is done using
|
|
|
|
|
confirmSelect() method. When the channel is successfully put in confirm mode,
|
|
|
|
|
the server starts counting the received messages (starting from 1) and sends
|
|
|
|
|
acknowledgments for every message it processed (it can also acknowledge
|
|
|
|
|
multiple message at once).
|
|
|
|
|
|
|
|
|
|
If server is unable to process a message, it will send send negative
|
|
|
|
|
acknowledgments. Both positive and negative acknowledgments handling are
|
|
|
|
|
implemented as callbacks for that should be installed on the object that
|
|
|
|
|
is returned by the confirmSelect() method:
|
2017-06-09 06:07:29 +08:00
|
|
|
|
|
|
|
|
````c++
|
2020-10-23 14:14:26 +08:00
|
|
|
// setup confirm mode and ack/nack callbacks (from this moment onwards
|
|
|
|
|
// ack/nack confirmations are coming in)
|
2018-05-14 18:59:20 +08:00
|
|
|
channel.confirmSelect().onSuccess([&]() {
|
2018-05-15 04:04:36 +08:00
|
|
|
|
2020-10-23 14:14:26 +08:00
|
|
|
// publish the first message (this will be acked/nacked with deliveryTag=1)
|
2017-06-09 06:07:29 +08:00
|
|
|
channel.publish("my-exchange", "my-key", "my first message");
|
|
|
|
|
|
2020-10-23 14:14:26 +08:00
|
|
|
// publish the second message (this will be acked/nacked with deliveryTag=2)
|
2017-06-09 06:07:29 +08:00
|
|
|
channel.publish("my-exchange", "my-key", "my second message");
|
2018-05-15 04:04:36 +08:00
|
|
|
|
2020-07-24 04:00:40 +08:00
|
|
|
}).onAck([&](uint64_t deliveryTag, bool multiple) {
|
2020-10-23 14:14:26 +08:00
|
|
|
|
2020-07-24 04:00:40 +08:00
|
|
|
// deliveryTag is message number
|
|
|
|
|
// multiple is set to true, if all messages UP TO deliveryTag have been processed
|
2020-10-23 14:14:26 +08:00
|
|
|
|
2018-05-15 04:04:36 +08:00
|
|
|
}).onNack([&](uint64 deliveryTag, bool multiple, bool requeue) {
|
2020-10-23 14:14:26 +08:00
|
|
|
|
2020-07-24 04:00:40 +08:00
|
|
|
// deliveryTag is message number
|
|
|
|
|
// multiple is set to true, if all messages UP TO deliveryTag have not been processed
|
2018-05-15 04:04:36 +08:00
|
|
|
// requeue is to be ignored
|
2020-10-23 14:14:26 +08:00
|
|
|
|
|
|
|
|
});
|
|
|
|
|
|
|
|
|
|
````
|
|
|
|
|
|
|
|
|
|
If you use this feature, you will have to implement your own bookkeeping to
|
|
|
|
|
track which messages have already been acked/nacked, and which messages
|
|
|
|
|
are still being handled. For your convenience however, the AMQP-CPP library
|
|
|
|
|
comes with a number of helper classes that can take over this responsibility.
|
|
|
|
|
|
|
|
|
|
The AMQP::Reliable class is an optional wrapper around channels. When you use
|
|
|
|
|
it, your underlying channel is automatically put it _confirm method_, and all publish
|
|
|
|
|
operations are individually acknowledged:
|
|
|
|
|
|
|
|
|
|
````c++
|
|
|
|
|
// create a channel
|
|
|
|
|
AMQP::TcpChannel mychannel(connection);
|
|
|
|
|
|
|
|
|
|
// wrap the channel into a reliable-object so that publish-opertions are
|
|
|
|
|
// individually confirmed (after wrapping the channel, it is recommended
|
|
|
|
|
// to no longer make direct calls to the channel)
|
|
|
|
|
AMQP::Reliable reliable(mychannel);
|
|
|
|
|
|
|
|
|
|
// publish a message via the reliable-channel
|
|
|
|
|
reliable.publish("my-exchange", "my-key", "my first message").onAck([]() {
|
|
|
|
|
|
|
|
|
|
// the message has been acknowledged by RabbitMQ (in your application
|
|
|
|
|
// code you can now safely discard the message as it has been picked up)
|
|
|
|
|
|
|
|
|
|
}).onNack([]() {
|
|
|
|
|
|
|
|
|
|
// the message has _explicitly_ been nack'ed by RabbitMQ (in your application
|
|
|
|
|
// code you probably want to log handle this to avoid data-loss)
|
|
|
|
|
|
|
|
|
|
}).onError([](const char *message) {
|
|
|
|
|
|
|
|
|
|
// a channel-error occured before any ack or nack was received, and the
|
|
|
|
|
// message is probably lost too (which you want to handle)
|
|
|
|
|
|
|
|
|
|
}).onLost([]() {
|
|
|
|
|
|
|
|
|
|
// because the implementation for onNack() and onError() will be the same
|
|
|
|
|
// in many applications, you can also choose to install a onLost() handler,
|
|
|
|
|
// which is called when the message has either been nack'ed, or lost.
|
|
|
|
|
|
2017-06-09 06:07:29 +08:00
|
|
|
});
|
2018-05-15 04:04:36 +08:00
|
|
|
|
2017-06-09 06:07:29 +08:00
|
|
|
````
|
|
|
|
|
|
2020-10-23 14:14:26 +08:00
|
|
|
In the above example we have implemented four callback methods. In a real life
|
|
|
|
|
application, implementing the onAck() and onLost() is normally sufficient.
|
|
|
|
|
|
|
|
|
|
Publisher-confirms are often useful in situations where you need reliability.
|
|
|
|
|
If you want to have certainty about whether your message was handled by RabbitMQ
|
|
|
|
|
or not, you can enable this feature, either by explicitly calling channel.confirmSelect()
|
|
|
|
|
if you want to do your own bookkeeping, or using AMQP::Reliable for a simpler
|
|
|
|
|
API.
|
|
|
|
|
|
|
|
|
|
But it also is useful for flood prevention. RabbitMQ turns out not to be very
|
|
|
|
|
good at handling big loads of publish-operations. If you publish messages faster
|
|
|
|
|
than RabbitMQ can handle, a server-side buffer builds up, and RabbitMQ gets slow
|
|
|
|
|
(which causes the buffer to build up evenm further, et cetera). With publish-confirms
|
|
|
|
|
you can keep the messages in your own application, and only proceed with publishing
|
|
|
|
|
them when your previous messages have been handled. With this approach you
|
|
|
|
|
prevent that RabbitMQ gets overloaded. We call it throttling.
|
|
|
|
|
|
|
|
|
|
You can build your own throttling mechanism using the confirmSelect() approach
|
|
|
|
|
or the AMQP::Reliable class. But also here the AMQP-CPP library already has
|
|
|
|
|
a utility class for you that you can use instead: AMQP::Throttle:
|
|
|
|
|
|
|
|
|
|
````c++
|
|
|
|
|
// create a channel
|
|
|
|
|
AMQP::TcpChannel mychannel(connection);
|
|
|
|
|
|
|
|
|
|
// create a throttle (do not publish more than 20 messages at once) (after
|
|
|
|
|
// wrapping the channel in a throttle you should no longer call any of the
|
|
|
|
|
// channel-methods directly)
|
|
|
|
|
AMQP::Throttle throttle(connection, 20);
|
|
|
|
|
|
|
|
|
|
// publish way more messages than RabbitMQ can handle (the Throttle class
|
|
|
|
|
// will make sure that messages are buffered inside your application if
|
|
|
|
|
// there are more than 20 unacked messages)
|
|
|
|
|
for (size_t i = 0; i < 100000; ++i)
|
|
|
|
|
{
|
|
|
|
|
// publish a message
|
|
|
|
|
throttle.publish("my-exchange", "my-key", "my first message");
|
|
|
|
|
}
|
|
|
|
|
````
|
|
|
|
|
|
|
|
|
|
The AMQP::Reliable and AMQP::Throttle classes that you wrap around a channel.
|
|
|
|
|
But what if you want to use both? You want to throttle messages, but also like
|
|
|
|
|
to install your own callbacks for onAck and onLost? This is possible too:
|
|
|
|
|
|
|
|
|
|
````c++
|
|
|
|
|
// create a channel
|
|
|
|
|
AMQP::TcpChannel mychannel(connection);
|
|
|
|
|
|
|
|
|
|
// create a throttle that allows reliable-publishing
|
|
|
|
|
AMQP::Throttle<AMQP::Reliable> throttle(connection, 20);
|
|
|
|
|
|
|
|
|
|
// publish way more messages than RabbitMQ can handle (the Throttle class
|
|
|
|
|
// will make sure that messages are buffered inside your application if
|
|
|
|
|
// there are more than 20 unacked messages)
|
|
|
|
|
for (size_t i = 0; i < 100000; ++i)
|
|
|
|
|
{
|
|
|
|
|
// publish a message
|
|
|
|
|
throttle.publish("my-exchange", "my-key", "my first message").onAck([]() {
|
|
|
|
|
|
|
|
|
|
// @todo add your own code
|
|
|
|
|
|
|
|
|
|
}).onLost([]() {
|
|
|
|
|
|
|
|
|
|
// @todo add your own code
|
|
|
|
|
|
|
|
|
|
});
|
|
|
|
|
}
|
|
|
|
|
````
|
|
|
|
|
|
2017-06-09 06:18:00 +08:00
|
|
|
For more information, please see http://www.rabbitmq.com/confirms.html.
|
2014-01-07 01:19:49 +08:00
|
|
|
|
|
|
|
|
CONSUMING MESSAGES
|
|
|
|
|
==================
|
|
|
|
|
|
2014-01-07 16:41:16 +08:00
|
|
|
Fetching messages from RabbitMQ is called consuming, and can be started by calling
|
|
|
|
|
the method Channel::consume(). After you've called this method, RabbitMQ starts
|
|
|
|
|
delivering messages to you.
|
|
|
|
|
|
|
|
|
|
Just like the publish() method that we just described, the consume() method also
|
2014-01-07 18:30:01 +08:00
|
|
|
comes in many forms. The first parameter is always the name of the queue you like
|
|
|
|
|
to consume from. The subsequent parameters are an optional consumer tag, flags and
|
|
|
|
|
a table with custom arguments. The first additional parameter, the consumer tag,
|
2014-04-15 20:22:30 +08:00
|
|
|
is nothing more than a string identifier that you can use when you want to stop
|
|
|
|
|
consuming.
|
2014-01-07 16:41:16 +08:00
|
|
|
|
|
|
|
|
The full documentation from the C++ Channel.h headerfile looks like this:
|
|
|
|
|
|
|
|
|
|
````c++
|
|
|
|
|
/**
|
|
|
|
|
* Tell the RabbitMQ server that we're ready to consume messages
|
2014-04-10 19:50:24 +08:00
|
|
|
*
|
2014-01-07 16:41:16 +08:00
|
|
|
* After this method is called, RabbitMQ starts delivering messages to the client
|
|
|
|
|
* application. The consume tag is a string identifier that will be passed to
|
2014-04-10 19:50:24 +08:00
|
|
|
* each received message, so that you can associate incoming messages with a
|
2014-01-07 16:41:16 +08:00
|
|
|
* consumer. If you do not specify a consumer tag, the server will assign one
|
|
|
|
|
* for you.
|
2014-04-10 19:50:24 +08:00
|
|
|
*
|
2014-01-07 16:41:16 +08:00
|
|
|
* The following flags are supported:
|
2014-04-10 19:50:24 +08:00
|
|
|
*
|
2014-04-29 22:07:43 +08:00
|
|
|
* - nolocal if set, messages published on this channel are
|
2014-04-15 20:22:30 +08:00
|
|
|
* not also consumed
|
|
|
|
|
*
|
2014-04-29 22:07:43 +08:00
|
|
|
* - noack if set, consumed messages do not have to be acked,
|
2014-04-15 20:22:30 +08:00
|
|
|
* this happens automatically
|
|
|
|
|
*
|
2014-04-29 22:07:43 +08:00
|
|
|
* - exclusive request exclusive access, only this consumer can
|
2014-04-15 20:22:30 +08:00
|
|
|
* access the queue
|
|
|
|
|
*
|
2015-01-26 21:47:30 +08:00
|
|
|
* The callback registered with DeferredConsumer::onSuccess() will be called when the
|
2014-04-29 22:07:43 +08:00
|
|
|
* consumer has started.
|
2014-04-10 19:50:24 +08:00
|
|
|
*
|
2014-01-07 16:41:16 +08:00
|
|
|
* @param queue the queue from which you want to consume
|
|
|
|
|
* @param tag a consumer tag that will be associated with this consume operation
|
|
|
|
|
* @param flags additional flags
|
|
|
|
|
* @param arguments additional arguments
|
|
|
|
|
* @return bool
|
|
|
|
|
*/
|
2014-04-15 20:22:30 +08:00
|
|
|
DeferredConsumer &consume(const std::string &queue, const std::string &tag, int flags, const AMQP::Table &arguments);
|
|
|
|
|
DeferredConsumer &consume(const std::string &queue, const std::string &tag, int flags = 0);
|
|
|
|
|
DeferredConsumer &consume(const std::string &queue, const std::string &tag, const AMQP::Table &arguments);
|
|
|
|
|
DeferredConsumer &consume(const std::string &queue, int flags, const AMQP::Table &arguments);
|
|
|
|
|
DeferredConsumer &consume(const std::string &queue, int flags = 0);
|
|
|
|
|
DeferredConsumer &consume(const std::string &queue, const AMQP::Table &arguments);
|
2014-01-07 16:41:16 +08:00
|
|
|
````
|
|
|
|
|
|
2014-04-29 22:07:43 +08:00
|
|
|
As you can see, the consume method returns a DeferredConsumer. This object is a
|
|
|
|
|
regular Deferred, with additions. The onSuccess() method of a
|
2014-04-15 20:22:30 +08:00
|
|
|
DeferredConsumer is slightly different than the onSuccess() method of a regular
|
|
|
|
|
Deferred object: one extra parameter will be supplied to your callback function
|
|
|
|
|
with the consumer tag.
|
2014-04-10 19:50:24 +08:00
|
|
|
|
2014-04-15 20:22:30 +08:00
|
|
|
The onSuccess() callback will be called when the consume operation _has started_,
|
2014-04-29 22:07:43 +08:00
|
|
|
but not when messages are actually consumed. For this you will have to install
|
2014-04-15 20:22:30 +08:00
|
|
|
a different callback, using the onReceived() method.
|
2014-01-07 16:41:16 +08:00
|
|
|
|
|
|
|
|
````c++
|
2014-04-15 20:22:30 +08:00
|
|
|
// callback function that is called when the consume operation starts
|
|
|
|
|
auto startCb = [](const std::string &consumertag) {
|
2014-04-10 19:50:24 +08:00
|
|
|
|
2014-04-15 20:22:30 +08:00
|
|
|
std::cout << "consume operation started" << std::endl;
|
|
|
|
|
};
|
|
|
|
|
|
|
|
|
|
// callback function that is called when the consume operation failed
|
|
|
|
|
auto errorCb = [](const char *message) {
|
|
|
|
|
|
|
|
|
|
std::cout << "consume operation failed" << std::endl;
|
2017-03-12 00:52:57 +08:00
|
|
|
};
|
2014-04-15 20:22:30 +08:00
|
|
|
|
|
|
|
|
// callback operation when a message was received
|
|
|
|
|
auto messageCb = [&channel](const AMQP::Message &message, uint64_t deliveryTag, bool redelivered) {
|
|
|
|
|
|
|
|
|
|
std::cout << "message received" << std::endl;
|
2014-04-29 22:07:43 +08:00
|
|
|
|
2014-04-15 20:22:30 +08:00
|
|
|
// acknowledge the message
|
|
|
|
|
channel.ack(deliveryTag);
|
2017-03-12 00:52:57 +08:00
|
|
|
};
|
2014-04-15 20:22:30 +08:00
|
|
|
|
|
|
|
|
// start consuming from the queue, and install the callbacks
|
|
|
|
|
channel.consume("my-queue")
|
2016-04-25 15:08:30 +08:00
|
|
|
.onReceived(messageCb)
|
2014-04-15 20:22:30 +08:00
|
|
|
.onSuccess(startCb)
|
2016-04-25 15:08:30 +08:00
|
|
|
.onError(errorCb);
|
2014-04-10 19:50:24 +08:00
|
|
|
|
2014-01-07 16:41:16 +08:00
|
|
|
````
|
|
|
|
|
|
2014-04-29 22:07:43 +08:00
|
|
|
The Message object holds all information of the delivered message: the actual
|
|
|
|
|
content, all meta information from the envelope (in fact, the Message class is
|
|
|
|
|
derived from the Envelope class), and even the name of the exchange and the
|
|
|
|
|
routing key that were used when the message was originally published. For a full
|
2014-04-15 20:22:30 +08:00
|
|
|
list of all information in the Message class, you best have a look at the
|
2014-01-07 16:41:16 +08:00
|
|
|
message.h, envelope.h and metadata.h header files.
|
|
|
|
|
|
2014-04-29 22:07:43 +08:00
|
|
|
Another important parameter to the onReceived() method is the deliveryTag parameter.
|
|
|
|
|
This is a unique identifier that you need to acknowledge an incoming message.
|
|
|
|
|
RabbitMQ only removes the message after it has been acknowledged, so that if your
|
|
|
|
|
application crashes while it was busy processing the message, the message does
|
|
|
|
|
not get lost but remains in the queue. But this means that after you've processed
|
|
|
|
|
the message, you must inform RabbitMQ about it by calling the Channel:ack() method.
|
2014-04-15 20:22:30 +08:00
|
|
|
This method is very simple and takes in its simplest form only one parameter: the
|
2014-01-07 16:41:16 +08:00
|
|
|
deliveryTag of the message.
|
|
|
|
|
|
2014-04-29 22:07:43 +08:00
|
|
|
Consuming messages is a continuous process. RabbitMQ keeps sending messages, until
|
|
|
|
|
you stop the consumer, which can be done by calling the Channel::cancel() method.
|
2014-04-15 20:22:30 +08:00
|
|
|
If you close the channel, or the entire TCP connection, consuming also stops.
|
|
|
|
|
|
2014-04-29 22:07:43 +08:00
|
|
|
RabbitMQ throttles the number of messages that are delivered to you, to prevent
|
|
|
|
|
that your application is flooded with messages from the queue, and to spread out
|
|
|
|
|
the messages over multiple consumers. This is done with a setting called
|
|
|
|
|
quality-of-service (QOS). The QOS setting is a numeric value which holds the number
|
2014-04-15 20:22:30 +08:00
|
|
|
of unacknowledged messages that you are allowed to have. RabbitMQ stops sending
|
2014-04-29 22:07:43 +08:00
|
|
|
additional messages when the number of unacknowledges messages has reached this
|
|
|
|
|
limit, and only sends additional messages when an earlier message gets acknowledged.
|
2014-04-15 20:22:30 +08:00
|
|
|
To change the QOS, you can simple call Channel::setQos().
|
2014-01-07 05:24:14 +08:00
|
|
|
|
|
|
|
|
|
2018-11-08 20:30:27 +08:00
|
|
|
UPGRADING
|
|
|
|
|
=========
|
|
|
|
|
|
|
|
|
|
AMQP-CPP 4.* is not always compatible with previous versions. Especially some
|
|
|
|
|
virtual methods in the ConnectionHandler and TcpHandler classes have been renamed
|
|
|
|
|
or are called during a different stage in the connection lifetime. Check
|
|
|
|
|
out this README file and the comments inside the connectionhandler.h and
|
|
|
|
|
tcphandler.h files to find out if your application has to be changed. You
|
|
|
|
|
should especially check the following:
|
|
|
|
|
|
|
|
|
|
- ConnectionHandler::onConnected has been renamed to ConnectionHandler::onReady
|
|
|
|
|
- TcpHandler::onConnected is now called sooner: when the TCP connection is
|
|
|
|
|
established, instead of when the AMQP connection is ready for instructions.
|
|
|
|
|
- The new method TcpHandler::onReady is called when the AMQP connection is
|
|
|
|
|
ready to be used (this is the old behavior of TcpHandler::onConnected)
|
|
|
|
|
- TcpHandler::onError is no longer the last method that is called (TcpHandler::onLost
|
|
|
|
|
could be called and TcpHandler::onDetached will be called after the error too)
|
|
|
|
|
- TcpHandler::onClosed is now called to indicate the graceful end of the
|
|
|
|
|
AMQP protocol, and not the end of TCP connection.
|
|
|
|
|
- TcpHandler::onLost is called when the TCP connection is lost or closed.
|
|
|
|
|
- The new method TcpHandler::onDetached is a better alternative for cleanup
|
|
|
|
|
code instead of TcpHandler::onClosed and/or TcpHandler::onError.
|
|
|
|
|
|
|
|
|
|
|
2014-01-05 01:42:58 +08:00
|
|
|
WORK IN PROGRESS
|
|
|
|
|
================
|
|
|
|
|
|
2014-01-06 22:49:31 +08:00
|
|
|
Almost all AMQP features have been implemented. But the following things might
|
|
|
|
|
need additional attention:
|
2014-01-06 04:26:41 +08:00
|
|
|
|
2014-01-06 22:49:31 +08:00
|
|
|
- ability to set up secure connections (or is this fully done on the IO level)
|
|
|
|
|
- login with other protocols than login/password
|
2014-01-07 01:19:49 +08:00
|
|
|
|
2014-04-10 19:50:24 +08:00
|
|
|
We also need to add more safety checks so that strange or invalid data from
|
|
|
|
|
RabbitMQ does not break the library (although in reality RabbitMQ only sends
|
2014-01-06 22:49:31 +08:00
|
|
|
valid data). Also, when we now receive an answer from RabbitMQ that does not
|
2014-04-10 19:50:24 +08:00
|
|
|
match the request that we sent before, we do not report an error (this is also
|
2014-01-06 22:49:31 +08:00
|
|
|
an issue that only occurs in theory).
|
|
|
|
|
|
|
|
|
|
It would be nice to have sample implementations for the ConnectionHandler
|
2014-01-06 04:26:41 +08:00
|
|
|
class that can be directly plugged into libev, libevent and libuv event loops.
|
|
|
|
|
|
|
|
|
|
For performance reasons, we need to investigate if we can limit the number of times
|
|
|
|
|
an incoming or outgoing messages is copied.
|