Merge pull request #1 from CopernicaMarketingSoftware/master

update 'master' branch from main repo
This commit is contained in:
Steven G 2018-01-31 15:19:04 +01:00 committed by GitHub
commit fff1c65854
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
117 changed files with 823 additions and 654 deletions

5
.gitignore vendored
View File

@ -13,3 +13,8 @@
*.la *.la
*.a *.a
*.a.* *.a.*
/build
/.vscode
.atom-build.cson
.atom-dbg.cson
/bin

91
.travis.yml Normal file
View File

@ -0,0 +1,91 @@
################
# Config
################
# C++ project
language: cpp
dist: trusty
sudo: required
group: edge
################
# Services
################
services:
- docker
################
# Build matrix
################
matrix:
include:
################
# Linux / GCC
################
- os: linux
env:
- COMPILER_PACKAGE=g++-6
- C_COMPILER=gcc-6
- CXX_COMPILER=g++-6
- os: linux
compiler: gcc
env:
- COMPILER_PACKAGE=g++-7
- C_COMPILER=gcc-7
- CXX_COMPILER=g++-7
################
# Linux / Clang
################
- os: linux
env:
- COMPILER_PACKAGE=clang-4.0
- C_COMPILER=clang-4.0
- CXX_COMPILER=clang++-4.0
- os: linux
env:
- COMPILER_PACKAGE=clang-5.0
- C_COMPILER=clang-5.0
- CXX_COMPILER=clang++-5.0
before_install:
# Show OS/compiler version (this may not be the same OS as the Docker container)
- uname -a
# Use an artful container - gives us access to latest compilers.
- docker run -d --name ubuntu-test-container -v $(pwd):/travis ubuntu:artful tail -f /dev/null
- docker ps
install:
# Create our container
- docker exec -t ubuntu-test-container bash -c "apt-get update -y &&
apt-get --no-install-recommends install -y software-properties-common cmake
ninja-build libboost-all-dev libev-dev libuv1-dev ninja-build $COMPILER_PACKAGE &&
apt-get -y clean && rm -rf /var/lib/apt/lists/*"
################
# Build / Test
################
script:
# Run the container that we created and build the code
- docker exec -t ubuntu-test-container bash -c "cd /travis &&
export CC=/usr/bin/$C_COMPILER &&
export CXX=/usr/bin/$CXX_COMPILER &&
mkdir build.release && cd build.release &&
cmake .. ${CMAKE_OPTIONS} -DAMQP-CPP_BUILD_EXAMPLES=ON -DAMQP-CPP_LINUX_TCP=ON -GNinja &&
cmake --build . --config Release &&
cd .."

View File

@ -1,11 +1,36 @@
cmake_minimum_required(VERSION 2.8) # Builds AMQP-CPP
#
# Options:
#
# - AMQP-CPP_BUILD_SHARED (default OFF)
# ON: Build shared lib
# OFF: Build static lib
#
# - AMQP-CPP_LINUX_TCP (default OFF)
# ON: Build posix handler implementation
# OFF: Don't build posix handler implementation
cmake_minimum_required(VERSION 3.1)
# project name
project(amqpcpp) project(amqpcpp)
# ensure c++11 on all compilers # build options
include(set_cxx_norm.cmake) option(AMQP-CPP_BUILD_SHARED "Build shared library. If off, build will be static." OFF)
set_cxx_norm (${CXX_NORM_CXX11}) option(AMQP-CPP_LINUX_TCP "Build linux sockets implementation." OFF)
option(AMQP-CPP_BUILD_EXAMPLES "Build amqpcpp examples" OFF)
# ensure c++11 on all compilers
set (CMAKE_CXX_STANDARD 11)
# add source files
# ------------------------------------------------------------------------------------------------------
# set include/ as include directory
include_directories(${CMAKE_SOURCE_DIR}/include)
# macro that adds a list of provided source files to a list called SRCS.
# if variable SRCS does not yet exist, it is created.
macro (add_sources) macro (add_sources)
file (RELATIVE_PATH _relPath "${PROJECT_SOURCE_DIR}" "${CMAKE_CURRENT_SOURCE_DIR}") file (RELATIVE_PATH _relPath "${PROJECT_SOURCE_DIR}" "${CMAKE_CURRENT_SOURCE_DIR}")
foreach (_src ${ARGN}) foreach (_src ${ARGN})
@ -21,36 +46,59 @@ macro (add_sources)
endif() endif()
endmacro() endmacro()
# add source files
add_subdirectory(src) add_subdirectory(src)
add_subdirectory(include) if(AMQP-CPP_LINUX_TCP)
add_subdirectory(src/linux_tcp)
option(BUILD_SHARED "build shared library" OFF)
if(BUILD_SHARED)
add_library(amqpcpp SHARED ${SRCS})
set_target_properties(amqpcpp PROPERTIES SOVERSION 2.7)
install(TARGETS amqpcpp
LIBRARY DESTINATION lib
)
else()
add_library(amqpcpp STATIC ${SRCS})
install(TARGETS amqpcpp
ARCHIVE DESTINATION lib
)
endif()
Include_directories(${PROJECT_SOURCE_DIR})
install(DIRECTORY include/ DESTINATION include/amqpcpp
FILES_MATCHING PATTERN "*.h")
install(FILES amqpcpp.h DESTINATION include)
option(BUILD_TUTORIALS "build rabbitmq tutorials" OFF)
if(BUILD_TUTORIALS)
# add_subdirectory(examples/rabbitmq_tutorials)
endif() endif()
set(AMQP-CPP_INCLUDE_PATH ${CMAKE_CURRENT_SOURCE_DIR}) # potentially build the examples
set(AMQP-CPP_INCLUDE_PATH ${CMAKE_CURRENT_SOURCE_DIR} PARENT_SCOPE) if(AMQP-CPP_BUILD_EXAMPLES)
add_subdirectory(examples)
endif()
# settings for specific compilers
# ------------------------------------------------------------------------------------------------------
# we have to prevent windows from defining the max macro.
if (WIN32) if (WIN32)
add_definitions(-DNOMINMAX -DWIN32_LEAN_AND_MEAN) add_definitions(-DNOMINMAX)
endif() endif()
# build targets
# ------------------------------------------------------------------------------------------------------
# set output directory
set(LIBRARY_OUTPUT_PATH ${CMAKE_SOURCE_DIR}/bin)
if(AMQP-CPP_BUILD_SHARED)
# create shared lib
add_library(${PROJECT_NAME} SHARED ${SRCS})
# set shared lib version
set_target_properties(${PROJECT_NAME} PROPERTIES SOVERSION 2.8)
else()
# create static lib
add_library(${PROJECT_NAME} STATIC ${SRCS})
endif()
# install rules
# ------------------------------------------------------------------------------------------------------
if(AMQP-CPP_BUILD_SHARED)
# copy shared lib and its static counter part
install(TARGETS ${PROJECT_NAME}
ARCHIVE DESTINATION lib
LIBRARY DESTINATION lib
RUNTIME DESTINATION lib
)
else()
# copy static lib
install(TARGETS ${PROJECT_NAME}
ARCHIVE DESTINATION lib
)
endif()
# copy header files
install(DIRECTORY include/amqpcpp/ DESTINATION include/amqpcpp
FILES_MATCHING PATTERN "*.h")
install(FILES include/amqpcpp.h DESTINATION include)

View File

@ -25,9 +25,11 @@ clean:
install: install:
mkdir -p ${INCLUDE_DIR}/$(LIBRARY_NAME) mkdir -p ${INCLUDE_DIR}/$(LIBRARY_NAME)
mkdir -p ${INCLUDE_DIR}/$(LIBRARY_NAME)/linux_tcp
mkdir -p ${LIBRARY_DIR} mkdir -p ${LIBRARY_DIR}
cp -f $(LIBRARY_NAME).h ${INCLUDE_DIR} cp -f include/$(LIBRARY_NAME).h ${INCLUDE_DIR}
cp -f include/*.h ${INCLUDE_DIR}/$(LIBRARY_NAME) cp -f include/amqpcpp/*.h ${INCLUDE_DIR}/$(LIBRARY_NAME)
cp -f include/amqpcpp/linux_tcp/*.h ${INCLUDE_DIR}/$(LIBRARY_NAME)/linux_tcp
-cp -f src/lib$(LIBRARY_NAME).so.$(VERSION) ${LIBRARY_DIR} -cp -f src/lib$(LIBRARY_NAME).so.$(VERSION) ${LIBRARY_DIR}
-cp -f src/lib$(LIBRARY_NAME).a.$(VERSION) ${LIBRARY_DIR} -cp -f src/lib$(LIBRARY_NAME).a.$(VERSION) ${LIBRARY_DIR}
ln -r -s -f $(LIBRARY_DIR)/lib$(LIBRARY_NAME).so.$(VERSION) $(LIBRARY_DIR)/lib$(LIBRARY_NAME).so.$(SONAME) ln -r -s -f $(LIBRARY_DIR)/lib$(LIBRARY_NAME).so.$(VERSION) $(LIBRARY_DIR)/lib$(LIBRARY_NAME).so.$(SONAME)

186
README.md
View File

@ -1,37 +1,39 @@
AMQP-CPP AMQP-CPP
======== ========
[![Build Status](https://travis-ci.org/CopernicaMarketingSoftware/AMQP-CPP.svg?branch=master)](https://travis-ci.org/CopernicaMarketingSoftware/AMQP-CPP)
AMQP-CPP is a C++ library for communicating with a RabbitMQ message broker. The AMQP-CPP is a C++ library for communicating with a RabbitMQ message broker. The
library can be used to parse incoming data from a RabbitMQ server, and to library can be used to parse incoming data from a RabbitMQ server, and to
generate frames that can be sent to a RabbitMQ server. generate frames that can be sent to a RabbitMQ server.
This library has a layered architecture, and allows you - if you like - to This library has a layered architecture, and allows you - if you like - to
completely take care of the network layer. If you want to set up and manage the completely take care of the network layer. If you want to set up and manage the
network connections yourself, the AMQP-CPP library will not make a connection to network connections yourself, the AMQP-CPP library will not make a connection to
RabbitMQ by itself, nor will it create sockets and/or perform IO operations. As RabbitMQ by itself, nor will it create sockets and/or perform IO operations. As
a user of this library, you create the socket connection and implement a certain a user of this library, you create the socket connection and implement a certain
interface that you pass to the AMQP-CPP library and that the library will use interface that you pass to the AMQP-CPP library and that the library will use
for IO operations. for IO operations.
Intercepting this network layer is however optional, the AMQP-CPP library also Intercepting this network layer is however optional, the AMQP-CPP library also
comes with a predefined Tcp module that can be used if you trust the AMQP library comes with a predefined Tcp module that can be used if you trust the AMQP library
to take care of the network handling. In that case, the AMQP-CPP library does to take care of the network handling. In that case, the AMQP-CPP library does
all the system calls to set up network connections and send and receive the data. all the system calls to set up network connections and send and receive the data.
This layered architecture makes the library extremely flexible and portable: it This layered architecture makes the library extremely flexible and portable: it
does not necessarily rely on operating system specific IO calls, and can be does not necessarily rely on operating system specific IO calls, and can be
easily integrated into any kind of event loop. If you want to implement the AMQP easily integrated into any kind of event loop. If you want to implement the AMQP
protocol on top of some [unusual other communication layer](https://tools.ietf.org/html/rfc1149), protocol on top of some [unusual other communication layer](https://tools.ietf.org/html/rfc1149),
this library can be used for that - but if you want to use it with regular TCP this library can be used for that - but if you want to use it with regular TCP
connections, setting it up is just as easy. connections, setting it up is just as easy.
AMQP-CPP is fully asynchronous and does not do any blocking (system) calls, so AMQP-CPP is fully asynchronous and does not do any blocking (system) calls, so
it can be used in high performance applications without the need for threads. it can be used in high performance applications without the need for threads.
The AMQP-CPP library uses C++11 features, so if you intend use it, please make The AMQP-CPP library uses C++11 features, so if you intend use it, please make
sure that your compiler is up-to-date and supports C++11. sure that your compiler is up-to-date and supports C++11.
**Note for the reader:** This readme file has a peculiar structure. We start **Note for the reader:** This readme file has a peculiar structure. We start
explaining the pure and hard core low level interface in which you have to explaining the pure and hard core low level interface in which you have to
take care of opening socket connections yourself. In reality, you probably want take care of opening socket connections yourself. In reality, you probably want
to use the simpler TCP interface that is being described [later on](#tcp-connections). to use the simpler TCP interface that is being described [later on](#tcp-connections).
@ -41,13 +43,13 @@ ABOUT
===== =====
This library is created and maintained by Copernica (www.copernica.com), and is This library is created and maintained by Copernica (www.copernica.com), and is
used inside the MailerQ (www.mailerq.com), Yothalot (www.yothalot.com) and used inside the MailerQ (www.mailerq.com), Yothalot (www.yothalot.com) and
AMQPipe (www.amqpipe.com) applications. MailerQ is a tool for sending large AMQPipe (www.amqpipe.com) applications. MailerQ is a tool for sending large
volumes of email, using AMQP message queues, Yothalot is a big data processing volumes of email, using AMQP message queues, Yothalot is a big data processing
map/reduce framework and AMQPipe is a tool for high-speed processing messages map/reduce framework and AMQPipe is a tool for high-speed processing messages
between AMQP pipes between AMQP pipes
Do you appreciate our work and are you looking for high quality email solutions? Do you appreciate our work and are you looking for high quality email solutions?
Then check out our other commercial and open source solutions: Then check out our other commercial and open source solutions:
* Copernica Marketing Suite (www.copernica.com) * Copernica Marketing Suite (www.copernica.com)
@ -62,24 +64,45 @@ Then check out our other commercial and open source solutions:
INSTALLING INSTALLING
========== ==========
AMQP-CPP comes with an optional Linux-only TCP module that takes care of the network part required for the AMQP-CPP core library. If you use this module, you are required to link with `pthread`.
If you are on some kind of Linux environment, installing the library is as easy There are two methods to compile AMQP-CPP: CMake and Make. CMake is platform portable, but the Makefile only works on Linux. Building of a shared library is currently not supported on Windows.
After building there are two relevant files to include when using the library.
File|Include when?
----|------------
amqpcpp.h|Always
amqpcpp/linux_tcp.h|If using the Linux-only TCP module
On Windows you are required to define `NOMINMAX` when compiling code that includes public AMQP-CPP header files.
## CMake
The CMake file supports both building and installing. You can choose not to use the install functionality, and instead manually use the build output at `bin/`. Keep in mind that the TCP module is only supported for Linux. An example install method would be:
``` bash
mkdir build
cd build
cmake .. [-DBUILD_SHARED] [-DLINUX_TCP]
cmake --build .. --target install
```
Option|Default|Meaning
------|-------|-------
BUILD_SHARED|OFF|Static lib(ON) or shared lib(OFF)? Shared is not supported on Windows.
LINUX_TCP|OFF|Should the Linux-only TCP module be built?
## Make
Installing the library is as easy
as running `make` and `make install`. This will install the full version of as running `make` and `make install`. This will install the full version of
the AMQP-CPP, including the system specific TCP module. If you do not need the the AMQP-CPP, including the system specific TCP module. If you do not need the
additional TCP module (because you take care of handling the network stuff additional TCP module (because you take care of handling the network stuff
yourself), you can also compile a pure form of the library. Use `make pure` yourself), you can also compile a pure form of the library. Use `make pure`
and `make install` for that. and `make install` for that.
For users on a non-Linux environment: this library is known to work on your
environment too (after all, it does not do any operating specific system calls),
but it might take some extra effort to get your compiler to compile it. Please
send in your pull requests once you have it running, so that others can benefit
from your experiences.
When you compile an application that uses the AMQP-CPP library, do not When you compile an application that uses the AMQP-CPP library, do not
forget to link with the library. For gcc and clang the linker flag is -lamqpcpp. forget to link with the library. For gcc and clang the linker flag is -lamqpcpp.
If you use the fullblown version of AMQP-CPP (with the TCP module), you also If you use the fullblown version of AMQP-CPP (with the TCP module), you also
need to pass a -lpthread linker flag, because the TCP module uses a thread need to pass a -lpthread linker flag, because the TCP module uses a thread
for running an asynchronous and non-blocking DNS hostname lookup. for running an asynchronous and non-blocking DNS hostname lookup.
@ -87,11 +110,11 @@ HOW TO USE AMQP-CPP
=================== ===================
As we mentioned above, the library can be used in a network-agnostic fashion. As we mentioned above, the library can be used in a network-agnostic fashion.
It then does not do any IO by itself, and you need to pass an object to the It then does not do any IO by itself, and you need to pass an object to the
library that the library can use for IO. So, before you start using the library that the library can use for IO. So, before you start using the
library, you first you need to create a class that extends from the library, you first you need to create a class that extends from the
ConnectionHandler base class. This is a class with a number of methods that are ConnectionHandler base class. This is a class with a number of methods that are
called by the library every time it wants to send out data, or when it needs to called by the library every time it wants to send out data, or when it needs to
inform you that an error occured. inform you that an error occured.
````c++ ````c++
@ -204,16 +227,16 @@ example call the "send()" or "write()" system call to send out the data to
the RabbitMQ server. But what about data in the other direction? How does the the RabbitMQ server. But what about data in the other direction? How does the
library receive data back from RabbitMQ? library receive data back from RabbitMQ?
In this raw setup, the AMQP-CPP library does not do any IO by itself and it is In this raw setup, the AMQP-CPP library does not do any IO by itself and it is
therefore also not possible for the library to receive data from a therefore also not possible for the library to receive data from a
socket. It is again up to you to do this. If, for example, you notice in your socket. It is again up to you to do this. If, for example, you notice in your
event loop that the socket that is connected with the RabbitMQ server becomes event loop that the socket that is connected with the RabbitMQ server becomes
readable, you should read out that socket (for example by using the recv() system readable, you should read out that socket (for example by using the recv() system
call), and pass the received bytes to the AMQP-CPP library. This is done by call), and pass the received bytes to the AMQP-CPP library. This is done by
calling the parse() method in the Connection object. calling the parse() method in the Connection object.
The Connection::parse() method gets two parameters, a pointer to a buffer of The Connection::parse() method gets two parameters, a pointer to a buffer of
data that you just read from the socket, and a parameter that holds the size of data that you just read from the socket, and a parameter that holds the size of
this buffer. The code snippet below comes from the Connection.h C++ header file. this buffer. The code snippet below comes from the Connection.h C++ header file.
````c++ ````c++
@ -249,9 +272,9 @@ both the old data, and the new data.
To optimize your calls to the parse() method, you _could_ use the Connection::expected() To optimize your calls to the parse() method, you _could_ use the Connection::expected()
and Connection::maxFrame() methods. The expected() method returns the number of bytes and Connection::maxFrame() methods. The expected() method returns the number of bytes
that the library prefers to receive next. It is pointless to call the parse() method that the library prefers to receive next. It is pointless to call the parse() method
with a smaller buffer, and it is best to call the method with a buffer of exactly this with a smaller buffer, and it is best to call the method with a buffer of exactly this
size. The maxFrame() returns the max frame size for AMQP messages. If you read your size. The maxFrame() returns the max frame size for AMQP messages. If you read your
messages into a reusable buffer, you could allocate this buffer up to this size, so that messages into a reusable buffer, you could allocate this buffer up to this size, so that
you never will have to reallocate. you never will have to reallocate.
@ -259,17 +282,17 @@ TCP CONNECTIONS
=============== ===============
Although the AMQP-CPP library gives you extreme flexibility by letting you setup Although the AMQP-CPP library gives you extreme flexibility by letting you setup
your own network connections, the reality is that most if not all AMQP connections your own network connections, the reality is that most if not all AMQP connections
use the TCP protocol. To help you out, the library therefore also comes with a use the TCP protocol. To help you out, the library therefore also comes with a
TCP module that takes care of setting up the network connections, and sending TCP module that takes care of setting up the network connections, and sending
and receiving the data. and receiving the data.
If you want to use this TCP module, you should not use the AMQP::Connection If you want to use this TCP module, you should not use the AMQP::Connection
and AMQP::Channel classes that you saw above, but the alternative AMQP::TcpConnection and AMQP::Channel classes that you saw above, but the alternative AMQP::TcpConnection
and AMQP::TcpChannel classes instead. You also do not have to create your own class and AMQP::TcpChannel classes instead. You also do not have to create your own class
that implements the "AMQP::ConnectionHandler" interface - but a class that inherits that implements the "AMQP::ConnectionHandler" interface - but a class that inherits
from "AMQP::TcpHandler" instead. You especially need to implement the "monitor()" from "AMQP::TcpHandler" instead. You especially need to implement the "monitor()"
method in that class, as that is needed by the AMQP-CPP library to interact with method in that class, as that is needed by the AMQP-CPP library to interact with
the main event loop: the main event loop:
````c++ ````c++
@ -334,7 +357,7 @@ class MyTcpHandler : public AMQP::TcpHandler
// descriptor to the main application event loop (like the select() or // descriptor to the main application event loop (like the select() or
// poll() loop). When the event loop reports that the descriptor becomes // poll() loop). When the event loop reports that the descriptor becomes
// readable and/or writable, it is up to you to inform the AMQP-CPP // readable and/or writable, it is up to you to inform the AMQP-CPP
// library that the filedescriptor is active by calling the // library that the filedescriptor is active by calling the
// connection->process(fd, flags) method. // connection->process(fd, flags) method.
} }
}; };
@ -344,8 +367,8 @@ The "monitor()" method can be used to integrate the AMQP filedescriptors in your
application's event loop. For some popular event loops (libev, libevent), we have application's event loop. For some popular event loops (libev, libevent), we have
already added example handler objects (see the next section for that). already added example handler objects (see the next section for that).
Using the TCP module of the AMQP-CPP library is easier than using the Using the TCP module of the AMQP-CPP library is easier than using the
raw AMQP::Connection and AMQP::Channel objects, because you do not have to raw AMQP::Connection and AMQP::Channel objects, because you do not have to
create the sockets and connections yourself, and you also do not have to take create the sockets and connections yourself, and you also do not have to take
care of buffering network data. care of buffering network data.
@ -376,9 +399,9 @@ EXISTING EVENT LOOPS
Both the pure AMQP::Connection as well as the easier AMQP::TcpConnection class Both the pure AMQP::Connection as well as the easier AMQP::TcpConnection class
allow you to integrate AMQP-CPP in your own event loop. Whether you take care allow you to integrate AMQP-CPP in your own event loop. Whether you take care
of running the event loop yourself (for example by using the select() system of running the event loop yourself (for example by using the select() system
call), or if you use an existing library for it (like libevent, libev or libuv), call), or if you use an existing library for it (like libevent, libev or libuv),
you can implement the "monitor()" method to watch the file descriptors and you can implement the "monitor()" method to watch the file descriptors and
hand over control back to AMQP-CPP when one of the sockets become active. hand over control back to AMQP-CPP when one of the sockets become active.
For libev and libevent users, we have even implemented an example implementation, For libev and libevent users, we have even implemented an example implementation,
@ -394,26 +417,26 @@ int main()
{ {
// access to the event loop // access to the event loop
auto *loop = EV_DEFAULT; auto *loop = EV_DEFAULT;
// handler for libev (so we don't have to implement AMQP::TcpHandler!) // handler for libev (so we don't have to implement AMQP::TcpHandler!)
AMQP::LibEvHandler handler(loop); AMQP::LibEvHandler handler(loop);
// make a connection // make a connection
AMQP::TcpConnection connection(&handler, AMQP::Address("amqp://localhost/")); AMQP::TcpConnection connection(&handler, AMQP::Address("amqp://localhost/"));
// we need a channel too // we need a channel too
AMQP::TcpChannel channel(&connection); AMQP::TcpChannel channel(&connection);
// create a temporary queue // create a temporary queue
channel.declareQueue(AMQP::exclusive).onSuccess([&connection](const std::string &name, uint32_t messagecount, uint32_t consumercount) { channel.declareQueue(AMQP::exclusive).onSuccess([&connection](const std::string &name, uint32_t messagecount, uint32_t consumercount) {
// report the name of the temporary queue // report the name of the temporary queue
std::cout << "declared queue " << name << std::endl; std::cout << "declared queue " << name << std::endl;
// now we can close the connection // now we can close the connection
connection.close(); connection.close();
}); });
// run the loop // run the loop
ev_run(loop, 0); ev_run(loop, 0);
@ -426,7 +449,7 @@ The AMQP::LibEvHandler and AMQP::LibEventHandler classes are extended AMQP::TcpH
classes, with an implementation of the monitor() method that simply adds the classes, with an implementation of the monitor() method that simply adds the
filedescriptor to the event loop. If you use this class however, it is recommended not to filedescriptor to the event loop. If you use this class however, it is recommended not to
instantiate it directly (like we did in the example), but to create your own instantiate it directly (like we did in the example), but to create your own
"MyHandler" class that extends from it, and in which you also implement the "MyHandler" class that extends from it, and in which you also implement the
onError() method to report possible connection errors to your end users. onError() method to report possible connection errors to your end users.
Currently, we have example TcpHandler implementations for libev, Currently, we have example TcpHandler implementations for libev,
@ -443,15 +466,15 @@ such examples.
HEARTBEATS HEARTBEATS
========== ==========
The AMQP protocol supports *heartbeats*. If this heartbeat feature is enabled, the The AMQP protocol supports *heartbeats*. If this heartbeat feature is enabled, the
client and the server negotiate a heartbeat interval during connection setup, and client and the server negotiate a heartbeat interval during connection setup, and
they agree to send at least *some kind of data* over the connection during every they agree to send at least *some kind of data* over the connection during every
iteration of that interval. The normal data that is sent over the connection (like iteration of that interval. The normal data that is sent over the connection (like
publishing or consuming messages) is normally sufficient to keep the connection alive, publishing or consuming messages) is normally sufficient to keep the connection alive,
but if the client or server was idle during the negotiated interval time, a dummy but if the client or server was idle during the negotiated interval time, a dummy
heartbeat message must be sent instead. heartbeat message must be sent instead.
The default behavior of the AMQP-CPP library is to disable heartbeats. The The default behavior of the AMQP-CPP library is to disable heartbeats. The
proposed heartbeat interval of the server during connection setup (the server proposed heartbeat interval of the server during connection setup (the server
normally suggests an interval of 60 seconds) is vetoed by the AMQP-CPP library so normally suggests an interval of 60 seconds) is vetoed by the AMQP-CPP library so
no heartbeats are ever needed to be sent over the connection. This means that you no heartbeats are ever needed to be sent over the connection. This means that you
@ -460,7 +483,7 @@ lasting algorithms after you've consumed a message from RabbitMQ, without having
to worry about the connection being idle for too long. to worry about the connection being idle for too long.
You can however choose to enable these heartbeats. If you want to enable heartbeats, You can however choose to enable these heartbeats. If you want to enable heartbeats,
simple implement the onNegotiate() method inside your ConnectionHandler or simple implement the onNegotiate() method inside your ConnectionHandler or
TcpHandler class and have it return the interval that you find appropriate. TcpHandler class and have it return the interval that you find appropriate.
````c++ ````c++
@ -478,14 +501,14 @@ class MyTcpHandler : public AMQP::TcpHandler
*/ */
virtual void onNegotiate(AMQP::TcpConnection *connection, uint16_t interval) virtual void onNegotiate(AMQP::TcpConnection *connection, uint16_t interval)
{ {
// we accept the suggestion from the server, but if the interval is smaller // we accept the suggestion from the server, but if the interval is smaller
// that one minute, we will use a one minute interval instead // that one minute, we will use a one minute interval instead
if (interval < 60) interval = 60; if (interval < 60) interval = 60;
// @todo // @todo
// set a timer in your event loop, and make sure that you call // set a timer in your event loop, and make sure that you call
// connection->heartbeat() every _interval_ seconds. // connection->heartbeat() every _interval_ seconds.
// return the interval that we want to use // return the interval that we want to use
return interval; return interval;
} }
@ -503,7 +526,7 @@ In the libev event loop implementation the heartbeats are enabled by default.
CHANNELS CHANNELS
======== ========
In the above example we created a channel object. A channel is a sort of virtual In the above example we created a channel object. A channel is a sort of virtual
connection, and it is possible to create many channels that all use connection, and it is possible to create many channels that all use
the same connection. the same connection.
@ -517,8 +540,8 @@ documented.
All operations that you can perform on a channel are non-blocking. This means All operations that you can perform on a channel are non-blocking. This means
that it is not possible for a method (like Channel::declareExchange()) to that it is not possible for a method (like Channel::declareExchange()) to
immediately return 'true' or 'false'. Instead, almost every method of the Channel immediately return 'true' or 'false'. Instead, almost every method of the Channel
class returns an instance of the 'Deferred' class. This 'Deferred' object can be class returns an instance of the 'Deferred' class. This 'Deferred' object can be
used to install handlers that will be called in case of success or failure. used to install handlers that will be called in case of success or failure.
For example, if you call the channel.declareExchange() method, the AMQP-CPP library For example, if you call the channel.declareExchange() method, the AMQP-CPP library
@ -610,7 +633,7 @@ channel object right after it was constructed.
CHANNEL ERRORS CHANNEL ERRORS
============== ==============
It is important to realize that any error that occurs on a channel, It is important to realize that any error that occurs on a channel,
invalidates the entire channel, including all subsequent instructions that invalidates the entire channel, including all subsequent instructions that
were already sent over it. This means that if you call multiple methods in a row, were already sent over it. This means that if you call multiple methods in a row,
and the first method fails, all subsequent methods will not be executed either: and the first method fails, all subsequent methods will not be executed either:
@ -642,7 +665,7 @@ requires and extra instruction to be sent to the RabbitMQ server, so some extra
bytes are sent over the network, and some additional resources in both the client bytes are sent over the network, and some additional resources in both the client
application and the RabbitMQ server are used (although this is all very limited). application and the RabbitMQ server are used (although this is all very limited).
If possible, it is best to make use of this feature. For example, if you have an important AMQP If possible, it is best to make use of this feature. For example, if you have an important AMQP
connection that you use for consuming messages, and at the same time you want connection that you use for consuming messages, and at the same time you want
to send another instruction to RabbitMQ (like declaring a temporary queue), it is to send another instruction to RabbitMQ (like declaring a temporary queue), it is
best to set up a new channel for this 'declare' instruction. If the declare fails, best to set up a new channel for this 'declare' instruction. If the declare fails,
@ -657,7 +680,7 @@ void myDeclareMethod(AMQP::Connection *connection)
{ {
// create temporary channel to declare a queue // create temporary channel to declare a queue
AMQP::Channel channel(connection); AMQP::Channel channel(connection);
// declare the queue (the channel object is destructed before the // declare the queue (the channel object is destructed before the
// instruction reaches the server, but the AMQP-CPP library can deal // instruction reaches the server, but the AMQP-CPP library can deal
// with this) // with this)
@ -840,8 +863,8 @@ channel.commitTransaction()
}); });
```` ````
Note that AMQP transactions are not as powerful as transactions that are Note that AMQP transactions are not as powerful as transactions that are
knows in the database world. It is not possible to wrap all sort of knows in the database world. It is not possible to wrap all sort of
operations in a transaction, they are only meaningful for publishing operations in a transaction, they are only meaningful for publishing
and consuming. and consuming.
@ -992,4 +1015,3 @@ class that can be directly plugged into libev, libevent and libuv event loops.
For performance reasons, we need to investigate if we can limit the number of times For performance reasons, we need to investigate if we can limit the number of times
an incoming or outgoing messages is copied. an incoming or outgoing messages is copied.

View File

@ -1,77 +0,0 @@
/**
* AMQP.h
*
* Starting point for all includes of the Copernica AMQP library
*
* @documentation public
*/
#pragma once
// base C++ include files
#include <vector>
#include <string>
#include <memory>
#include <map>
#include <unordered_map>
#include <queue>
#include <limits>
#include <cstddef>
#include <cstring>
#include <stdexcept>
#include <utility>
#include <iostream>
#include <algorithm>
#include <functional>
// base C include files
#include <stdint.h>
#include <math.h>
// forward declarations
#include <amqpcpp/classes.h>
// utility classes
#include <amqpcpp/endian.h>
#include <amqpcpp/buffer.h>
#include <amqpcpp/bytebuffer.h>
#include <amqpcpp/receivedframe.h>
#include <amqpcpp/outbuffer.h>
#include <amqpcpp/watchable.h>
#include <amqpcpp/monitor.h>
// amqp types
#include <amqpcpp/field.h>
#include <amqpcpp/numericfield.h>
#include <amqpcpp/decimalfield.h>
#include <amqpcpp/stringfield.h>
#include <amqpcpp/booleanset.h>
#include <amqpcpp/fieldproxy.h>
#include <amqpcpp/table.h>
#include <amqpcpp/array.h>
// envelope for publishing and consuming
#include <amqpcpp/metadata.h>
#include <amqpcpp/envelope.h>
#include <amqpcpp/message.h>
// mid level includes
#include <amqpcpp/exchangetype.h>
#include <amqpcpp/flags.h>
#include <amqpcpp/callbacks.h>
#include <amqpcpp/deferred.h>
#include <amqpcpp/deferredconsumer.h>
#include <amqpcpp/deferredqueue.h>
#include <amqpcpp/deferreddelete.h>
#include <amqpcpp/deferredcancel.h>
#include <amqpcpp/deferredget.h>
#include <amqpcpp/channelimpl.h>
#include <amqpcpp/channel.h>
#include <amqpcpp/login.h>
#include <amqpcpp/address.h>
#include <amqpcpp/connectionhandler.h>
#include <amqpcpp/connectionimpl.h>
#include <amqpcpp/connection.h>
#include <amqpcpp/tcphandler.h>
#include <amqpcpp/tcpconnection.h>
#include <amqpcpp/tcpchannel.h>

19
appveyor.yml Normal file
View File

@ -0,0 +1,19 @@
version: '1.0.{build}'
image: Visual Studio 2017
platform:
- x64
configuration:
- Release
- Debug
install:
- git submodule update --init --recursive
before_build:
- cmake -G "Visual Studio 15 2017 Win64" .
build:
project: $(APPVEYOR_BUILD_FOLDER)\amqpcpp.sln

32
examples/CMakeLists.txt Normal file
View File

@ -0,0 +1,32 @@
###################################
# Boost
###################################
add_executable(amqpcpp_boost_example libboostasio.cpp)
add_dependencies(amqpcpp_boost_example amqpcpp)
target_link_libraries(amqpcpp_boost_example amqpcpp boost_system pthread)
###################################
# Libev
###################################
add_executable(amqpcpp_libev_example libev.cpp)
add_dependencies(amqpcpp_libev_example amqpcpp)
target_link_libraries(amqpcpp_libev_example amqpcpp ev pthread)
###################################
# Libuv
###################################
add_executable(amqpcpp_libuv_example libuv.cpp)
add_dependencies(amqpcpp_libuv_example amqpcpp)
target_link_libraries(amqpcpp_libuv_example amqpcpp uv pthread)

View File

@ -1,54 +0,0 @@
add_sources(
address.h
addresses.h
array.h
booleanset.h
buffer.h
bytebuffer.h
callbacks.h
channel.h
channelimpl.h
classes.h
connection.h
connectionhandler.h
connectionimpl.h
copiedbuffer.h
decimalfield.h
deferred.h
deferredcancel.h
deferredconsumer.h
deferredconsumerbase.h
deferreddelete.h
deferredget.h
deferredqueue.h
endian.h
entityimpl.h
envelope.h
exception.h
exchangetype.h
field.h
fieldproxy.h
flags.h
frame.h
libboostasio.h
libev.h
libevent.h
libuv.h
login.h
message.h
metadata.h
monitor.h
numericfield.h
outbuffer.h
protocolexception.h
receivedframe.h
stack_ptr.h
stringfield.h
table.h
tcpchannel.h
tcpconnection.h
tcpdefines.h
tcphandler.h
watchable.h
)

84
include/amqpcpp.h Normal file
View File

@ -0,0 +1,84 @@
/**
* AMQP.h
*
* Starting point for all includes of the Copernica AMQP library
*
* @author Emiel Bruijntjes <emiel.bruijntjes@copernica.com>
* @copyright 2015 - 2018 Copernica BV
*/
#pragma once
// base C++ include files
#include <vector>
#include <string>
#include <memory>
#include <map>
#include <unordered_map>
#include <queue>
#include <limits>
#include <cstddef>
#include <cstring>
#include <stdexcept>
#include <utility>
#include <iostream>
#include <algorithm>
#include <functional>
// base C include files
#include <stdint.h>
#include <math.h>
// fix strcasecmp on non linux platforms
#if (defined(_WIN16) || defined(_WIN32) || defined(_WIN64) || defined(__WINDOWS__)) && !defined(__CYGWIN__)
#define strcasecmp _stricmp
#endif
// forward declarations
#include "amqpcpp/classes.h"
// utility classes
#include "amqpcpp/endian.h"
#include "amqpcpp/buffer.h"
#include "amqpcpp/bytebuffer.h"
#include "amqpcpp/receivedframe.h"
#include "amqpcpp/outbuffer.h"
#include "amqpcpp/watchable.h"
#include "amqpcpp/monitor.h"
// amqp types
#include "amqpcpp/field.h"
#include "amqpcpp/numericfield.h"
#include "amqpcpp/decimalfield.h"
#include "amqpcpp/stringfield.h"
#include "amqpcpp/booleanset.h"
#include "amqpcpp/fieldproxy.h"
#include "amqpcpp/table.h"
#include "amqpcpp/array.h"
// envelope for publishing and consuming
#include "amqpcpp/metadata.h"
#include "amqpcpp/envelope.h"
#include "amqpcpp/message.h"
// mid level includes
#include "amqpcpp/exchangetype.h"
#include "amqpcpp/flags.h"
#include "amqpcpp/callbacks.h"
#include "amqpcpp/deferred.h"
#include "amqpcpp/deferredconsumer.h"
#include "amqpcpp/deferredqueue.h"
#include "amqpcpp/deferreddelete.h"
#include "amqpcpp/deferredcancel.h"
#include "amqpcpp/deferredget.h"
#include "amqpcpp/channelimpl.h"
#include "amqpcpp/channel.h"
#include "amqpcpp/login.h"
#include "amqpcpp/address.h"
#include "amqpcpp/connectionhandler.h"
#include "amqpcpp/connectionimpl.h"
#include "amqpcpp/connection.h"
// tcp level includes
#include "amqpcpp/linux_tcp.h"

View File

@ -72,7 +72,7 @@
#define be32toh(x) ntohl(x) #define be32toh(x) ntohl(x)
#define le32toh(x) (x) #define le32toh(x) (x)
#define htobe64(x) htonll(x) #define htobe64(x) ((1==htonl(1)) ? (x) : ((uint64_t)htonl((x) & 0xFFFFFFFF) << 32) | htonl((x) >> 32))
#define htole64(x) (x) #define htole64(x) (x)
#define be64toh(x) ntohll(x) #define be64toh(x) ntohll(x)
#define le64toh(x) (x) #define le64toh(x) (x)

View File

@ -25,36 +25,16 @@
#include <boost/asio/deadline_timer.hpp> #include <boost/asio/deadline_timer.hpp>
#include <boost/asio/posix/stream_descriptor.hpp> #include <boost/asio/posix/stream_descriptor.hpp>
#include <boost/bind.hpp> #include <boost/bind.hpp>
#include <boost/function.hpp>
#include "amqpcpp/linux_tcp.h"
/////////////////////////////////////////////////////////////////// // C++17 has 'weak_from_this()' support.
#define STRAND_SOCKET_HANDLER(_fn) \ #if __cplusplus >= 201701L
[fn = _fn, strand = _strand](const boost::system::error_code &ec, \ #define PTR_FROM_THIS weak_from_this
const std::size_t bytes_transferred) \ #else
{ \ #define PTR_FROM_THIS shared_from_this
const std::shared_ptr<boost::asio::io_service::strand> apStrand = strand.lock(); \ #endif
if (!apStrand) \
{ \
fn(boost::system::errc::make_error_code(boost::system::errc::operation_canceled),std::size_t{0}); \
return; \
} \
\
apStrand->dispatch(boost::bind(fn,ec,bytes_transferred)); \
}
///////////////////////////////////////////////////////////////////
#define STRAND_TIMER_HANDLER(_fn) \
[fn = _fn, strand = _strand](const boost::system::error_code &ec) \
{ \
const std::shared_ptr<boost::asio::io_service::strand> apStrand = strand.lock(); \
if (!apStrand) \
{ \
fn(boost::system::errc::make_error_code(boost::system::errc::operation_canceled)); \
return; \
} \
\
apStrand->dispatch(boost::bind(fn,ec)); \
}
/** /**
* Set up namespace * Set up namespace
@ -82,11 +62,13 @@ private:
*/ */
boost::asio::io_service & _ioservice; boost::asio::io_service & _ioservice;
typedef std::weak_ptr<boost::asio::io_service::strand> strand_weak_ptr;
/** /**
* The boost asio io_service::strand managed pointer. * The boost asio io_service::strand managed pointer.
* @var class std::shared_ptr<boost::asio::io_service> * @var class std::shared_ptr<boost::asio::io_service>
*/ */
std::weak_ptr<boost::asio::io_service::strand> _strand; strand_weak_ptr _wpstrand;
/** /**
* The boost tcp socket. * The boost tcp socket.
@ -120,6 +102,66 @@ private:
*/ */
bool _write_pending{false}; bool _write_pending{false};
using handler_cb = boost::function<void(boost::system::error_code,std::size_t)>;
using io_handler = boost::function<void(const boost::system::error_code&, const std::size_t)>;
/**
* Builds a io handler callback that executes the io callback in a strand.
* @param io_handler The handler callback to dispatch
* @return handler_cb A function wrapping the execution of the handler function in a io_service::strand.
*/
handler_cb get_dispatch_wrapper(io_handler fn)
{
const strand_weak_ptr wpstrand = _wpstrand;
return [fn, wpstrand](const boost::system::error_code &ec, const std::size_t bytes_transferred)
{
const strand_shared_ptr strand = wpstrand.lock();
if (!strand)
{
fn(boost::system::errc::make_error_code(boost::system::errc::operation_canceled), std::size_t{0});
return;
}
strand->dispatch(boost::bind(fn, ec, bytes_transferred));
};
}
/**
* Binds and returns a read handler for the io operation.
* @param connection The connection being watched.
* @param fd The file descripter being watched.
* @return handler callback
*/
handler_cb get_read_handler(TcpConnection *const connection, const int fd)
{
auto fn = boost::bind(&Watcher::read_handler,
this,
_1,
_2,
PTR_FROM_THIS(),
connection,
fd);
return get_dispatch_wrapper(fn);
}
/**
* Binds and returns a read handler for the io operation.
* @param connection The connection being watched.
* @param fd The file descripter being watched.
* @return handler callback
*/
handler_cb get_write_handler(TcpConnection *const connection, const int fd)
{
auto fn = boost::bind(&Watcher::write_handler,
this,
_1,
_2,
PTR_FROM_THIS(),
connection,
fd);
return get_dispatch_wrapper(fn);
}
/** /**
* Handler method that is called by boost's io_service when the socket pumps a read event. * Handler method that is called by boost's io_service when the socket pumps a read event.
* @param ec The status of the callback. * @param ec The status of the callback.
@ -147,21 +189,10 @@ private:
connection->process(fd, AMQP::readable); connection->process(fd, AMQP::readable);
_read_pending = true; _read_pending = true;
_socket.async_read_some(boost::asio::null_buffers(), _socket.async_read_some(
STRAND_SOCKET_HANDLER( boost::asio::null_buffers(),
boost::bind(&Watcher::read_handler, get_read_handler(connection, fd));
this,
boost::arg<1>(),
boost::arg<2>(),
// C++17 has 'weak_from_this()' support.
#if __cplusplus >= 201701L
weak_from_this(),
#else
shared_from_this(),
#endif
connection,
fd)));
} }
} }
@ -193,20 +224,9 @@ private:
_write_pending = true; _write_pending = true;
_socket.async_write_some(boost::asio::null_buffers(), _socket.async_write_some(
STRAND_SOCKET_HANDLER( boost::asio::null_buffers(),
boost::bind(&Watcher::write_handler, get_write_handler(connection, fd));
this,
boost::arg<1>(),
boost::arg<2>(),
// C++17 has 'weak_from_this()' support.
#if __cplusplus >= 201701L
weak_from_this(),
#else
shared_from_this(),
#endif
connection,
fd)));
} }
} }
@ -215,14 +235,14 @@ private:
* Constructor- initialises the watcher and assigns the filedescriptor to * Constructor- initialises the watcher and assigns the filedescriptor to
* a boost socket for monitoring. * a boost socket for monitoring.
* @param io_service The boost io_service * @param io_service The boost io_service
* @param strand A weak pointer to a io_service::strand instance. * @param wpstrand A weak pointer to a io_service::strand instance.
* @param fd The filedescriptor being watched * @param fd The filedescriptor being watched
*/ */
Watcher(boost::asio::io_service &io_service, Watcher(boost::asio::io_service &io_service,
const std::weak_ptr<boost::asio::io_service::strand> strand, const strand_weak_ptr wpstrand,
const int fd) : const int fd) :
_ioservice(io_service), _ioservice(io_service),
_strand(strand), _wpstrand(wpstrand),
_socket(_ioservice) _socket(_ioservice)
{ {
_socket.assign(fd); _socket.assign(fd);
@ -262,20 +282,9 @@ private:
{ {
_read_pending = true; _read_pending = true;
_socket.async_read_some(boost::asio::null_buffers(), _socket.async_read_some(
STRAND_SOCKET_HANDLER( boost::asio::null_buffers(),
boost::bind(&Watcher::read_handler, get_read_handler(connection, fd));
this,
boost::arg<1>(),
boost::arg<2>(),
// C++17 has 'weak_from_this()' support.
#if __cplusplus >= 201701L
weak_from_this(),
#else
shared_from_this(),
#endif
connection,
fd)));
} }
// 2. Handle writes? // 2. Handle writes?
@ -286,20 +295,9 @@ private:
{ {
_write_pending = true; _write_pending = true;
_socket.async_write_some(boost::asio::null_buffers(), _socket.async_write_some(
STRAND_SOCKET_HANDLER( boost::asio::null_buffers(),
boost::bind(&Watcher::write_handler, get_write_handler(connection, fd));
this,
boost::arg<1>(),
boost::arg<2>(),
// C++17 has 'weak_from_this()' support.
#if __cplusplus >= 201701L
weak_from_this(),
#else
shared_from_this(),
#endif
connection,
fd)));
} }
} }
}; };
@ -317,11 +315,13 @@ private:
*/ */
boost::asio::io_service & _ioservice; boost::asio::io_service & _ioservice;
typedef std::weak_ptr<boost::asio::io_service::strand> strand_weak_ptr;
/** /**
* The boost asio io_service::strand managed pointer. * The boost asio io_service::strand managed pointer.
* @var class std::shared_ptr<boost::asio::io_service> * @var class std::shared_ptr<boost::asio::io_service>
*/ */
std::weak_ptr<boost::asio::io_service::strand> _strand; strand_weak_ptr _wpstrand;
/** /**
* The boost asynchronous deadline timer. * The boost asynchronous deadline timer.
@ -329,11 +329,42 @@ private:
*/ */
boost::asio::deadline_timer _timer; boost::asio::deadline_timer _timer;
using handler_fn = boost::function<void(boost::system::error_code)>;
/**
* Binds and returns a lamba function handler for the io operation.
* @param connection The connection being watched.
* @param timeout The file descripter being watched.
* @return handler callback
*/
handler_fn get_handler(TcpConnection *const connection, const uint16_t timeout)
{
const auto fn = boost::bind(&Timer::timeout,
this,
_1,
PTR_FROM_THIS(),
connection,
timeout);
const strand_weak_ptr wpstrand = _wpstrand;
return [fn, wpstrand](const boost::system::error_code &ec)
{
const strand_shared_ptr strand = wpstrand.lock();
if (!strand)
{
fn(boost::system::errc::make_error_code(boost::system::errc::operation_canceled));
return;
}
strand->dispatch(boost::bind(fn, ec));
};
}
/** /**
* Callback method that is called by libev when the timer expires * Callback method that is called by libev when the timer expires
* @param ec error code returned from loop
* @param loop The loop in which the event was triggered * @param loop The loop in which the event was triggered
* @param timer Internal timer object * @param connection
* @param revents The events that triggered this call * @param timeout
*/ */
void timeout(const boost::system::error_code &ec, void timeout(const boost::system::error_code &ec,
std::weak_ptr<Timer> awpThis, std::weak_ptr<Timer> awpThis,
@ -357,18 +388,7 @@ private:
_timer.expires_at(_timer.expires_at() + boost::posix_time::seconds(timeout)); _timer.expires_at(_timer.expires_at() + boost::posix_time::seconds(timeout));
// Posts the timer event // Posts the timer event
_timer.async_wait(STRAND_TIMER_HANDLER( _timer.async_wait(get_handler(connection, timeout));
boost::bind(&Timer::timeout,
this,
boost::arg<1>(),
// C++17 has 'weak_from_this()' support.
#if __cplusplus >= 201701L
weak_from_this(),
#else
shared_from_this(),
#endif
connection,
timeout)));
} }
} }
@ -385,13 +405,13 @@ private:
/** /**
* Constructor * Constructor
* @param io_service The boost asio io_service. * @param io_service The boost asio io_service.
* @param strand A weak pointer to a io_service::strand instance. * @param wpstrand A weak pointer to a io_service::strand instance.
*/ */
Timer(boost::asio::io_service &io_service, Timer(boost::asio::io_service &io_service,
const std::weak_ptr<boost::asio::io_service::strand> strand) : const strand_weak_ptr wpstrand) :
_ioservice(io_service), _ioservice(io_service),
_strand(strand), _wpstrand(wpstrand),
_timer(_ioservice) _timer(_ioservice)
{ {
} }
@ -423,19 +443,11 @@ private:
// stop timer in case it was already set // stop timer in case it was already set
stop(); stop();
// Reschedule the timer for the future:
_timer.expires_from_now(boost::posix_time::seconds(timeout)); _timer.expires_from_now(boost::posix_time::seconds(timeout));
_timer.async_wait(STRAND_TIMER_HANDLER(
boost::bind(&Timer::timeout, // Posts the timer event
this, _timer.async_wait(get_handler(connection, timeout));
boost::arg<1>(),
// C++17 has 'weak_from_this()' support.
#if __cplusplus >= 201701L
weak_from_this(),
#else
shared_from_this(),
#endif
connection,
timeout)));
} }
}; };
@ -445,11 +457,13 @@ private:
*/ */
boost::asio::io_service & _ioservice; boost::asio::io_service & _ioservice;
typedef std::shared_ptr<boost::asio::io_service::strand> strand_shared_ptr;
/** /**
* The boost asio io_service::strand managed pointer. * The boost asio io_service::strand managed pointer.
* @var class std::shared_ptr<boost::asio::io_service> * @var class std::shared_ptr<boost::asio::io_service>
*/ */
std::shared_ptr<boost::asio::io_service::strand> _strand; strand_shared_ptr _strand;
/** /**

View File

@ -8,7 +8,7 @@
* Compile with: "g++ -std=c++11 libev.cpp -lamqpcpp -lev -lpthread" * Compile with: "g++ -std=c++11 libev.cpp -lamqpcpp -lev -lpthread"
* *
* @author Emiel Bruijntjes <emiel.bruijntjes@copernica.com> * @author Emiel Bruijntjes <emiel.bruijntjes@copernica.com>
* @copyright 2015 Copernica BV * @copyright 2015 - 2018 Copernica BV
*/ */
/** /**
@ -21,6 +21,8 @@
*/ */
#include <ev.h> #include <ev.h>
#include "amqpcpp/linux_tcp.h"
/** /**
* Set up namespace * Set up namespace
*/ */

View File

@ -21,6 +21,8 @@
*/ */
#include <uv.h> #include <uv.h>
#include "amqpcpp/linux_tcp.h"
/** /**
* Set up namespace * Set up namespace
*/ */

View File

@ -0,0 +1,3 @@
#include "linux_tcp/tcphandler.h"
#include "linux_tcp/tcpconnection.h"
#include "linux_tcp/tcpchannel.h"

View File

@ -21,6 +21,7 @@
#include "envelope.h" #include "envelope.h"
#include <limits> #include <limits>
#include <stdexcept> #include <stdexcept>
#include <algorithm>
/** /**
* Set up namespace * Set up namespace
@ -93,10 +94,10 @@ protected:
size = std::min(size, _bodySize - _filled); size = std::min(size, _bodySize - _filled);
// append more data // append more data
memcpy(_body + _filled, buffer, size); memcpy(_body + _filled, buffer, (size_t)size);
// update filled data // update filled data
_filled += size; _filled += (size_t)size;
} }
else if (size >= _bodySize) else if (size >= _bodySize)
{ {
@ -107,16 +108,16 @@ protected:
else else
{ {
// allocate the buffer // allocate the buffer
_body = (char *)malloc(_bodySize); _body = (char *)malloc((size_t)_bodySize);
// remember that the buffer was allocated, so that the destructor can get rid of it // remember that the buffer was allocated, so that the destructor can get rid of it
_allocated = true; _allocated = true;
// append more data // append more data
memcpy(_body, buffer, std::min(size, _bodySize)); memcpy(_body, buffer, std::min((size_t)size, (size_t)_bodySize));
// update filled data // update filled data
_filled = std::min(size, _bodySize); _filled = std::min((size_t)size, (size_t)_bodySize);
} }
// check if we're done // check if we're done

View File

@ -302,20 +302,20 @@ public:
// the result (2 for the two boolean sets) // the result (2 for the two boolean sets)
uint32_t result = 2; uint32_t result = 2;
if (hasExpiration()) result += _expiration.size(); if (hasExpiration()) result += (uint32_t)_expiration.size();
if (hasReplyTo()) result += _replyTo.size(); if (hasReplyTo()) result += (uint32_t)_replyTo.size();
if (hasCorrelationID()) result += _correlationID.size(); if (hasCorrelationID()) result += (uint32_t)_correlationID.size();
if (hasPriority()) result += _priority.size(); if (hasPriority()) result += (uint32_t)_priority.size();
if (hasDeliveryMode()) result += _deliveryMode.size(); if (hasDeliveryMode()) result += (uint32_t)_deliveryMode.size();
if (hasHeaders()) result += _headers.size(); if (hasHeaders()) result += (uint32_t)_headers.size();
if (hasContentEncoding()) result += _contentEncoding.size(); if (hasContentEncoding()) result += (uint32_t)_contentEncoding.size();
if (hasContentType()) result += _contentType.size(); if (hasContentType()) result += (uint32_t)_contentType.size();
if (hasClusterID()) result += _clusterID.size(); if (hasClusterID()) result += (uint32_t)_clusterID.size();
if (hasAppID()) result += _appID.size(); if (hasAppID()) result += (uint32_t)_appID.size();
if (hasUserID()) result += _userID.size(); if (hasUserID()) result += (uint32_t)_userID.size();
if (hasTypeName()) result += _typeName.size(); if (hasTypeName()) result += (uint32_t)_typeName.size();
if (hasTimestamp()) result += _timestamp.size(); if (hasTimestamp()) result += (uint32_t)_timestamp.size();
if (hasMessageID()) result += _messageID.size(); if (hasMessageID()) result += (uint32_t)_messageID.size();
// done // done
return result; return result;

View File

@ -1,6 +1,6 @@
/** /**
* Numeric field types for AMQP * Numeric field types for AMQP
* *
* @copyright 2014 Copernica BV * @copyright 2014 Copernica BV
*/ */
@ -42,6 +42,8 @@ private:
T _value; T _value;
public: public:
using Type = T;
/** /**
* Default constructor, assign 0 * Default constructor, assign 0
*/ */
@ -116,14 +118,14 @@ public:
* Get the value * Get the value
* @return mixed * @return mixed
*/ */
operator uint8_t () const override { return _value; } operator uint8_t () const override { return (uint8_t)_value; }
operator uint16_t() const override { return _value; } operator uint16_t() const override { return (uint16_t)_value; }
operator uint32_t() const override { return _value; } operator uint32_t() const override { return (uint32_t)_value; }
operator uint64_t() const override { return _value; } operator uint64_t() const override { return (uint64_t)_value; }
operator int8_t () const override { return _value; } operator int8_t () const override { return (int8_t)_value; }
operator int16_t () const override { return _value; } operator int16_t () const override { return (int16_t)_value; }
operator int32_t () const override { return _value; } operator int32_t () const override { return (int32_t)_value; }
operator int64_t () const override { return _value; } operator int64_t () const override { return (int64_t)_value; }
/** /**
* Get the value * Get the value
@ -213,4 +215,3 @@ typedef NumericField<double, 'd'> Double;
* end namespace * end namespace
*/ */
} }

View File

@ -119,7 +119,7 @@ public:
virtual size_t size() const override virtual size_t size() const override
{ {
// find out size of the size parameter // find out size of the size parameter
T size(_data.size()); T size((typename T::Type)_data.size());
// size of the uint8 or uint32 + the actual string size // size of the uint8 or uint32 + the actual string size
return size.size() + _data.size(); return size.size() + _data.size();
@ -160,7 +160,7 @@ public:
virtual void fill(OutBuffer& buffer) const override virtual void fill(OutBuffer& buffer) const override
{ {
// create size // create size
T size(_data.size()); T size((typename T::Type)_data.size());
// first, write down the size of the string // first, write down the size of the string
size.fill(buffer); size.fill(buffer);
@ -210,4 +210,3 @@ typedef StringField<ULong, 'S'> LongString;
* end namespace * end namespace
*/ */
} }

View File

@ -1,55 +0,0 @@
# Version
cmake_minimum_required(VERSION 2.6.3)
set(CXX_NORM_CXX98 1) # C++98
set(CXX_NORM_CXX03 2) # C++03
set(CXX_NORM_CXX11 3) # C++11
# - Set the wanted C++ norm
# Adds the good argument to the command line in function of the compiler
# Currently only works with g++ and clang++
macro(set_cxx_norm NORM)
# Extract c++ compiler --version output
exec_program(
${CMAKE_CXX_COMPILER}
ARGS --version
OUTPUT_VARIABLE _compiler_output
)
# Keep only the first line
string(REGEX REPLACE
"(\n.*$)"
""
cxx_compiler_version "${_compiler_output}"
)
# Extract the version number
string(REGEX REPLACE
"([^0-9.])|([0-9.][^0-9.])"
""
cxx_compiler_version "${cxx_compiler_version}"
)
if(CMAKE_COMPILER_IS_GNUCXX)
if(${NORM} EQUAL ${CXX_NORM_CXX98})
add_definitions("-std=c++98")
elseif(${NORM} EQUAL ${CXX_NORM_CXX03})
add_definitions("-std=c++03")
elseif(${NORM} EQUAL ${CXX_NORM_CXX11})
if(${cxx_compiler_version} VERSION_LESS "4.7.0")
add_definitions("-std=c++0x")
else()
add_definitions("-std=c++11")
endif()
endif()
elseif(${CMAKE_CXX_COMPILER_ID} STREQUAL "Clang")
if(${NORM} EQUAL ${CXX_NORM_CXX11})
add_definitions("-std=c++11")
endif()
endif()
endmacro()

View File

@ -1,100 +1,91 @@
add_sources( add_sources(
addressinfo.h array.cpp
array.cpp basicackframe.h
basicackframe.h basiccancelframe.h
basiccancelframe.h basiccancelokframe.h
basiccancelokframe.h basicconsumeframe.h
basicconsumeframe.h basicconsumeokframe.h
basicconsumeokframe.h basicdeliverframe.h
basicdeliverframe.h basicframe.h
basicframe.h basicgetemptyframe.h
basicgetemptyframe.h basicgetframe.h
basicgetframe.h basicgetokframe.h
basicgetokframe.h basicheaderframe.h
basicheaderframe.h basicnackframe.h
basicnackframe.h basicpublishframe.h
basicpublishframe.h basicqosframe.h
basicqosframe.h basicqosokframe.h
basicqosokframe.h basicrecoverasyncframe.h
basicrecoverasyncframe.h basicrecoverframe.h
basicrecoverframe.h basicrecoverokframe.h
basicrecoverokframe.h basicrejectframe.h
basicrejectframe.h basicreturnframe.h
basicreturnframe.h bodyframe.h
bodyframe.h channelcloseframe.h
channelcloseframe.h channelcloseokframe.h
channelcloseokframe.h channelflowframe.h
channelflowframe.h channelflowokframe.h
channelflowokframe.h channelframe.h
channelframe.h channelimpl.cpp
channelimpl.cpp channelopenframe.h
channelopenframe.h channelopenokframe.h
channelopenokframe.h connectioncloseframe.h
connectioncloseframe.h connectioncloseokframe.h
connectioncloseokframe.h connectionframe.h
connectionframe.h connectionimpl.cpp
connectionimpl.cpp connectionopenframe.h
connectionopenframe.h connectionopenokframe.h
connectionopenokframe.h connectionsecureframe.h
connectionsecureframe.h connectionsecureokframe.h
connectionsecureokframe.h connectionstartframe.h
connectionstartframe.h connectionstartokframe.h
connectionstartokframe.h connectiontuneframe.h
connectiontuneframe.h connectiontuneokframe.h
connectiontuneokframe.h consumedmessage.h
consumedmessage.h deferredcancel.cpp
deferredcancel.cpp deferredconsumer.cpp
deferredconsumer.cpp deferredconsumerbase.cpp
deferredconsumerbase.cpp deferredget.cpp
deferredget.cpp exchangebindframe.h
exchangebindframe.h exchangebindokframe.h
exchangebindokframe.h exchangedeclareframe.h
exchangedeclareframe.h exchangedeclareokframe.h
exchangedeclareokframe.h exchangedeleteframe.h
exchangedeleteframe.h exchangedeleteokframe.h
exchangedeleteokframe.h exchangeframe.h
exchangeframe.h exchangeunbindframe.h
exchangeunbindframe.h exchangeunbindokframe.h
exchangeunbindokframe.h extframe.h
extframe.h field.cpp
field.cpp flags.cpp
flags.cpp framecheck.h
framecheck.h headerframe.h
headerframe.h heartbeatframe.h
heartbeatframe.h includes.h
includes.h methodframe.h
methodframe.h passthroughbuffer.h
passthroughbuffer.h protocolheaderframe.h
pipe.h queuebindframe.h
protocolheaderframe.h queuebindokframe.h
queuebindframe.h queuedeclareframe.h
queuebindokframe.h queuedeclareokframe.h
queuedeclareframe.h queuedeleteframe.h
queuedeclareokframe.h queuedeleteokframe.h
queuedeleteframe.h queueframe.h
queuedeleteokframe.h queuepurgeframe.h
queueframe.h queuepurgeokframe.h
queuepurgeframe.h queueunbindframe.h
queuepurgeokframe.h queueunbindokframe.h
queueunbindframe.h receivedframe.cpp
queueunbindokframe.h reducedbuffer.h
receivedframe.cpp returnedmessage.h
reducedbuffer.h table.cpp
returnedmessage.h transactioncommitframe.h
table.cpp transactioncommitokframe.h
tcpclosed.h transactionframe.h
tcpconnected.h transactionrollbackframe.h
tcpconnection.cpp transactionrollbackokframe.h
tcpinbuffer.h transactionselectframe.h
tcpoutbuffer.h transactionselectokframe.h
tcpresolver.h watchable.cpp
tcpstate.h
transactioncommitframe.h
transactioncommitokframe.h
transactionframe.h
transactionrollbackframe.h
transactionrollbackokframe.h
transactionselectframe.h
transactionselectokframe.h
watchable.cpp
) )

View File

@ -1,11 +1,11 @@
CPP = g++ CPP = g++
RM = rm -f RM = rm -f
CPPFLAGS = -Wall -c -I. -std=c++11 -MD CPPFLAGS = -Wall -c -I../include -std=c++11 -MD
LD = g++ LD = g++
LD_FLAGS = -Wall -shared LD_FLAGS = -Wall -shared
SHARED_LIB = lib$(LIBRARY_NAME).so.$(VERSION) SHARED_LIB = lib$(LIBRARY_NAME).so.$(VERSION)
STATIC_LIB = lib$(LIBRARY_NAME).a.$(VERSION) STATIC_LIB = lib$(LIBRARY_NAME).a.$(VERSION)
SOURCES = $(wildcard *.cpp) SOURCES = $(wildcard *.cpp) $(wildcard linux_tcp/*.cpp)
SHARED_OBJECTS = $(SOURCES:%.cpp=%.o) SHARED_OBJECTS = $(SOURCES:%.cpp=%.o)
STATIC_OBJECTS = $(SOURCES:%.cpp=%.s.o) STATIC_OBJECTS = $(SOURCES:%.cpp=%.s.o)
DEPENDENCIES = $(SOURCES:%.cpp=%.d) DEPENDENCIES = $(SOURCES:%.cpp=%.d)
@ -53,4 +53,3 @@ ${SHARED_OBJECTS}:
${STATIC_OBJECTS}: ${STATIC_OBJECTS}:
${CPP} ${CPPFLAGS} -o $@ ${@:%.s.o=%.cpp} ${CPP} ${CPPFLAGS} -o $@ ${@:%.s.o=%.cpp}

View File

@ -29,7 +29,7 @@ Array::Array(ReceivedFrame &frame)
if (!field) continue; if (!field) continue;
// less bytes to read // less bytes to read
charsToRead -= field->size(); charsToRead -= (uint32_t)field->size();
// add the additional field // add the additional field
_fields.push_back(std::shared_ptr<Field>(field)); _fields.push_back(std::shared_ptr<Field>(field));
@ -76,7 +76,7 @@ const Field &Array::get(uint8_t index) const
*/ */
uint32_t Array::count() const uint32_t Array::count() const
{ {
return _fields.size(); return (uint32_t)_fields.size();
} }
/** /**

View File

@ -59,7 +59,7 @@ public:
* @param noWait whether to wait for a response. * @param noWait whether to wait for a response.
*/ */
BasicCancelFrame(uint16_t channel, const std::string& consumerTag, bool noWait = false) : BasicCancelFrame(uint16_t channel, const std::string& consumerTag, bool noWait = false) :
BasicFrame(channel, consumerTag.size() + 2), // 1 for extra string size, 1 for bool BasicFrame(channel, (uint32_t)(consumerTag.size() + 2)), // 1 for extra string size, 1 for bool
_consumerTag(consumerTag), _consumerTag(consumerTag),
_noWait(noWait) {} _noWait(noWait) {}

View File

@ -53,7 +53,7 @@ public:
* @param consumerTag holds the consumertag specified by client or server * @param consumerTag holds the consumertag specified by client or server
*/ */
BasicCancelOKFrame(uint16_t channel, std::string& consumerTag) : BasicCancelOKFrame(uint16_t channel, std::string& consumerTag) :
BasicFrame(channel, consumerTag.length() + 1), // add 1 byte for encoding the size of consumer tag BasicFrame(channel, (uint32_t)(consumerTag.length() + 1)), // add 1 byte for encoding the size of consumer tag
_consumerTag(consumerTag) _consumerTag(consumerTag)
{} {}

View File

@ -86,7 +86,7 @@ public:
* @param filter additional arguments * @param filter additional arguments
*/ */
BasicConsumeFrame(uint16_t channel, const std::string& queueName, const std::string& consumerTag, bool noLocal = false, bool noAck = false, bool exclusive = false, bool noWait = false, const Table& filter = {}) : BasicConsumeFrame(uint16_t channel, const std::string& queueName, const std::string& consumerTag, bool noLocal = false, bool noAck = false, bool exclusive = false, bool noWait = false, const Table& filter = {}) :
BasicFrame(channel, (queueName.length() + consumerTag.length() + 5 + filter.size())), // size of vars, +1 for each shortstring size, +1 for bools, +2 for deprecated value BasicFrame(channel, (uint32_t)(queueName.length() + consumerTag.length() + 5 + filter.size())), // size of vars, +1 for each shortstring size, +1 for bools, +2 for deprecated value
_queueName(queueName), _queueName(queueName),
_consumerTag(consumerTag), _consumerTag(consumerTag),
_bools(noLocal, noAck, exclusive, noWait), _bools(noLocal, noAck, exclusive, noWait),

View File

@ -43,7 +43,7 @@ public:
* @param consumerTag consumertag specified by client of provided by server * @param consumerTag consumertag specified by client of provided by server
*/ */
BasicConsumeOKFrame(uint16_t channel, const std::string& consumerTag) : BasicConsumeOKFrame(uint16_t channel, const std::string& consumerTag) :
BasicFrame(channel, consumerTag.length() + 1), // length of string + 1 for encoding of stringsize BasicFrame(channel, (uint32_t)(consumerTag.length() + 1)), // length of string + 1 for encoding of stringsize
_consumerTag(consumerTag) _consumerTag(consumerTag)
{} {}

View File

@ -13,9 +13,9 @@
* Dependencies * Dependencies
*/ */
#include "basicframe.h" #include "basicframe.h"
#include "../include/stringfield.h" #include "amqpcpp/stringfield.h"
#include "../include/booleanset.h" #include "amqpcpp/booleanset.h"
#include "../include/connectionimpl.h" #include "amqpcpp/connectionimpl.h"
/** /**
* Set up namespace * Set up namespace
@ -87,7 +87,7 @@ public:
* @param routingKey message routing key * @param routingKey message routing key
*/ */
BasicDeliverFrame(uint16_t channel, const std::string& consumerTag, uint64_t deliveryTag, bool redelivered = false, const std::string& exchange = "", const std::string& routingKey = "") : BasicDeliverFrame(uint16_t channel, const std::string& consumerTag, uint64_t deliveryTag, bool redelivered = false, const std::string& exchange = "", const std::string& routingKey = "") :
BasicFrame(channel, (consumerTag.length() + exchange.length() + routingKey.length() + 12)), BasicFrame(channel, (uint32_t)(consumerTag.length() + exchange.length() + routingKey.length() + 12)),
// length of strings + 1 byte per string for stringsize, 8 bytes for uint64_t and 1 for bools // length of strings + 1 byte per string for stringsize, 8 bytes for uint64_t and 1 for bools
_consumerTag(consumerTag), _consumerTag(consumerTag),
_deliveryTag(deliveryTag), _deliveryTag(deliveryTag),

View File

@ -59,7 +59,7 @@ public:
* @param noAck whether server expects acknowledgements for messages * @param noAck whether server expects acknowledgements for messages
*/ */
BasicGetFrame(uint16_t channel, const std::string& queue, bool noAck = false) : BasicGetFrame(uint16_t channel, const std::string& queue, bool noAck = false) :
BasicFrame(channel, queue.length() + 4), // 1 for bool, 1 for string size, 2 for deprecated field BasicFrame(channel, (uint32_t)(queue.length() + 4)), // 1 for bool, 1 for string size, 2 for deprecated field
_queue(queue), _queue(queue),
_noAck(noAck) _noAck(noAck)
{} {}

View File

@ -76,7 +76,7 @@ public:
* @param messageCount number of messages in the queue * @param messageCount number of messages in the queue
*/ */
BasicGetOKFrame(uint16_t channel, uint64_t deliveryTag, bool redelivered, const std::string& exchange, const std::string& routingKey, uint32_t messageCount) : BasicGetOKFrame(uint16_t channel, uint64_t deliveryTag, bool redelivered, const std::string& exchange, const std::string& routingKey, uint32_t messageCount) :
BasicFrame(channel, (exchange.length() + routingKey.length() + 15)), // string length, +1 for each shortsrting length + 8 (uint64_t) + 4 (uint32_t) + 1 (bool) BasicFrame(channel, (uint32_t)(exchange.length() + routingKey.length() + 15)), // string length, +1 for each shortsrting length + 8 (uint64_t) + 4 (uint32_t) + 1 (bool)
_deliveryTag(deliveryTag), _deliveryTag(deliveryTag),
_redelivered(redelivered), _redelivered(redelivered),
_exchange(exchange), _exchange(exchange),

View File

@ -13,10 +13,10 @@
* Dependencies * Dependencies
*/ */
#include "headerframe.h" #include "headerframe.h"
#include "../include/metadata.h" #include "amqpcpp/metadata.h"
#include "../include/envelope.h" #include "amqpcpp/envelope.h"
#include "../include/connectionimpl.h" #include "amqpcpp/connectionimpl.h"
#include "../include/deferredconsumerbase.h" #include "amqpcpp/deferredconsumerbase.h"
/** /**
* Set up namespace * Set up namespace

View File

@ -70,7 +70,7 @@ public:
* @param immediate request immediate delivery @default = false * @param immediate request immediate delivery @default = false
*/ */
BasicPublishFrame(uint16_t channel, const std::string& exchange = "", const std::string& routingKey = "", bool mandatory = false, bool immediate = false) : BasicPublishFrame(uint16_t channel, const std::string& exchange = "", const std::string& routingKey = "", bool mandatory = false, bool immediate = false) :
BasicFrame(channel, exchange.length() + routingKey.length() + 5), // 1 extra per string (for the size), 1 for bools, 2 for deprecated field BasicFrame(channel, (uint32_t)(exchange.length() + routingKey.length() + 5)), // 1 extra per string (for the size), 1 for bools, 2 for deprecated field
_exchange(exchange), _exchange(exchange),
_routingKey(routingKey), _routingKey(routingKey),
_bools(mandatory, immediate) _bools(mandatory, immediate)

View File

@ -67,7 +67,7 @@ public:
* @param routingKey message routing key * @param routingKey message routing key
*/ */
BasicReturnFrame(uint16_t channel, int16_t replyCode, const std::string& replyText = "", const std::string& exchange = "", const std::string& routingKey = "") : BasicReturnFrame(uint16_t channel, int16_t replyCode, const std::string& replyText = "", const std::string& exchange = "", const std::string& routingKey = "") :
BasicFrame(channel, replyText.length() + exchange.length() + routingKey.length() + 5), // 3 for each string (extra size byte), 2 for uint16_t BasicFrame(channel, (uint32_t)(replyText.length() + exchange.length() + routingKey.length() + 5)), // 3 for each string (extra size byte), 2 for uint16_t
_replyCode(replyCode), _replyCode(replyCode),
_replyText(replyText), _replyText(replyText),
_exchange(exchange), _exchange(exchange),

View File

@ -13,8 +13,8 @@
* Dependencies * Dependencies
*/ */
#include "extframe.h" #include "extframe.h"
#include "../include/connectionimpl.h" #include "amqpcpp/connectionimpl.h"
#include "../include/deferredconsumerbase.h" #include "amqpcpp/deferredconsumerbase.h"
/** /**
* Set up namespace * Set up namespace

View File

@ -82,7 +82,7 @@ public:
* @param failingMethod failing method id if applicable * @param failingMethod failing method id if applicable
*/ */
ChannelCloseFrame(uint16_t channel, uint16_t code = 0, const std::string& text = "", uint16_t failingClass = 0, uint16_t failingMethod = 0) : ChannelCloseFrame(uint16_t channel, uint16_t code = 0, const std::string& text = "", uint16_t failingClass = 0, uint16_t failingMethod = 0) :
ChannelFrame(channel, (text.length() + 7)), // sizeof code, failingclass, failingmethod (2byte + 2byte + 2byte) + text length + text length byte ChannelFrame(channel, (uint32_t)(text.length() + 7)), // sizeof code, failingclass, failingmethod (2byte + 2byte + 2byte) + text length + text length byte
_code(code), _code(code),
_text(text), _text(text),
_failingClass(failingClass), _failingClass(failingClass),

View File

@ -481,7 +481,7 @@ bool ChannelImpl::publish(const std::string &exchange, const std::string &routin
uint64_t chunksize = std::min(static_cast<uint64_t>(maxpayload), bytesleft); uint64_t chunksize = std::min(static_cast<uint64_t>(maxpayload), bytesleft);
// send out a body frame // send out a body frame
if (!send(BodyFrame(_id, data + bytessent, chunksize))) return false; if (!send(BodyFrame(_id, data + bytessent, (uint32_t)chunksize))) return false;
// channel still valid? // channel still valid?
if (!monitor.valid()) return false; if (!monitor.valid()) return false;

View File

@ -82,7 +82,7 @@ public:
* @param failingMethod id of the failing method if applicable * @param failingMethod id of the failing method if applicable
*/ */
ConnectionCloseFrame(uint16_t code, const std::string &text, uint16_t failingClass = 0, uint16_t failingMethod = 0) : ConnectionCloseFrame(uint16_t code, const std::string &text, uint16_t failingClass = 0, uint16_t failingMethod = 0) :
ConnectionFrame(text.length() + 7), // 1 for extra string byte, 2 for each uint16 ConnectionFrame((uint32_t)(text.length() + 7)), // 1 for extra string byte, 2 for each uint16
_code(code), _code(code),
_text(text), _text(text),
_failingClass(failingClass), _failingClass(failingClass),

View File

@ -125,7 +125,7 @@ uint64_t ConnectionImpl::parse(const Buffer &buffer)
try try
{ {
// try to recognize the frame // try to recognize the frame
ReducedBuffer reduced_buf(buffer, processed); ReducedBuffer reduced_buf(buffer, (size_t)processed);
ReceivedFrame receivedFrame(reduced_buf, _maxFrame); ReceivedFrame receivedFrame(reduced_buf, _maxFrame);
// do we have the full frame? // do we have the full frame?
@ -146,7 +146,7 @@ uint64_t ConnectionImpl::parse(const Buffer &buffer)
// have the initial bytes of the header, we already know how much // have the initial bytes of the header, we already know how much
// data we need for the next frame, otherwise we need at least 7 // data we need for the next frame, otherwise we need at least 7
// bytes for processing the header of the next frame // bytes for processing the header of the next frame
_expected = receivedFrame.header() ? receivedFrame.totalSize() : 7; _expected = receivedFrame.header() ? (uint32_t)receivedFrame.totalSize() : 7;
// we're ready for now // we're ready for now
return processed; return processed;

View File

@ -60,7 +60,7 @@ public:
* @param vhost name of virtual host to open * @param vhost name of virtual host to open
*/ */
ConnectionOpenFrame(const std::string &vhost) : ConnectionOpenFrame(const std::string &vhost) :
ConnectionFrame(vhost.length() + 3), // length of vhost + byte to encode this length + deprecated shortstring size + deprecated bool ConnectionFrame((uint32_t)(vhost.length() + 3)), // length of vhost + byte to encode this length + deprecated shortstring size + deprecated bool
_vhost(vhost), _vhost(vhost),
_deprecatedCapabilities(""), _deprecatedCapabilities(""),
_deprecatedInsist() _deprecatedInsist()

View File

@ -43,7 +43,7 @@ public:
* @param challenge the challenge * @param challenge the challenge
*/ */
ConnectionSecureFrame(const std::string& challenge) : ConnectionSecureFrame(const std::string& challenge) :
ConnectionFrame(challenge.length() + 4), // 4 for the length of the challenge (uint32_t) ConnectionFrame((uint32_t)(challenge.length() + 4)), // 4 for the length of the challenge (uint32_t)
_challenge(challenge) _challenge(challenge)
{} {}

View File

@ -43,7 +43,7 @@ public:
* @param response the challenge response * @param response the challenge response
*/ */
ConnectionSecureOKFrame(const std::string& response) : ConnectionSecureOKFrame(const std::string& response) :
ConnectionFrame(response.length() + 4), //response length + uint32_t for encoding the length ConnectionFrame((uint32_t)(response.length() + 4)), //response length + uint32_t for encoding the length
_response(response) _response(response)
{} {}

View File

@ -81,7 +81,7 @@ public:
* @param locales available locales * @param locales available locales
*/ */
ConnectionStartFrame(uint8_t major, uint8_t minor, const Table& properties, const std::string& mechanisms, const std::string& locales) : ConnectionStartFrame(uint8_t major, uint8_t minor, const Table& properties, const std::string& mechanisms, const std::string& locales) :
ConnectionFrame((properties.size() + mechanisms.length() + locales.length() + 10)), // 4 for each longstring (size-uint32), 2 major/minor ConnectionFrame((uint32_t)(properties.size() + mechanisms.length() + locales.length() + 10)), // 4 for each longstring (size-uint32), 2 major/minor
_major(major), _major(major),
_minor(minor), _minor(minor),
_properties(properties), _properties(properties),

View File

@ -82,7 +82,7 @@ public:
* @param locale selected locale. * @param locale selected locale.
*/ */
ConnectionStartOKFrame(const Table& properties, const std::string& mechanism, const std::string& response, const std::string& locale) : ConnectionStartOKFrame(const Table& properties, const std::string& mechanism, const std::string& response, const std::string& locale) :
ConnectionFrame((properties.size() + mechanism.length() + response.length() + locale.length() + 6)), // 1 byte extra per shortstring, 4 per longstring ConnectionFrame((uint32_t)(properties.size() + mechanism.length() + response.length() + locale.length() + 6)), // 1 byte extra per shortstring, 4 per longstring
_properties(properties), _properties(properties),
_mechanism(mechanism), _mechanism(mechanism),
_response(response), _response(response),

View File

@ -10,7 +10,7 @@
/** /**
* Dependencies * Dependencies
*/ */
#include "../include/deferredconsumerbase.h" #include "amqpcpp/deferredconsumerbase.h"
#include "basicdeliverframe.h" #include "basicdeliverframe.h"
#include "basicgetokframe.h" #include "basicgetokframe.h"
#include "basicheaderframe.h" #include "basicheaderframe.h"

View File

@ -95,7 +95,7 @@ public:
* @param arguments * @param arguments
*/ */
ExchangeBindFrame(uint16_t channel, const std::string &destination, const std::string &source, const std::string &routingKey, bool noWait, const Table &arguments) : ExchangeBindFrame(uint16_t channel, const std::string &destination, const std::string &source, const std::string &routingKey, bool noWait, const Table &arguments) :
ExchangeFrame(channel, (destination.length() + source.length() + routingKey.length() + arguments.size() + 6)), // 1 for each string, 1 for booleanset, 2 for deprecated field ExchangeFrame(channel, (uint32_t)(destination.length() + source.length() + routingKey.length() + arguments.size() + 6)), // 1 for each string, 1 for booleanset, 2 for deprecated field
_destination(destination), _destination(destination),
_source(source), _source(source),
_routingKey(routingKey), _routingKey(routingKey),

View File

@ -81,7 +81,7 @@ public:
* @param arguments additional arguments * @param arguments additional arguments
*/ */
ExchangeDeclareFrame(uint16_t channel, const std::string& name, const std::string& type, bool passive, bool durable, bool noWait, const Table& arguments) : ExchangeDeclareFrame(uint16_t channel, const std::string& name, const std::string& type, bool passive, bool durable, bool noWait, const Table& arguments) :
ExchangeFrame(channel, (name.length() + type.length() + arguments.size() + 5)), // size of name, type and arguments + 1 (all booleans are stored in 1 byte) + 2 (deprecated short) + 2 (string sizes) ExchangeFrame(channel, (uint32_t)(name.length() + type.length() + arguments.size() + 5)), // size of name, type and arguments + 1 (all booleans are stored in 1 byte) + 2 (deprecated short) + 2 (string sizes)
_name(name), _name(name),
_type(type), _type(type),
_bools(passive, durable, false, false, noWait), _bools(passive, durable, false, false, noWait),

View File

@ -73,7 +73,7 @@ public:
* @param bool noWait Do not wait for a response * @param bool noWait Do not wait for a response
*/ */
ExchangeDeleteFrame(uint16_t channel, const std::string& name, bool ifUnused = false, bool noWait = false) : ExchangeDeleteFrame(uint16_t channel, const std::string& name, bool ifUnused = false, bool noWait = false) :
ExchangeFrame(channel, name.length() + 4), // length of the name, 1 byte for encoding this length, 1 for bools, 2 for deprecated short ExchangeFrame(channel, (uint32_t)(name.length() + 4)), // length of the name, 1 byte for encoding this length, 1 for bools, 2 for deprecated short
_name(name), _name(name),
_bools(ifUnused, noWait) _bools(ifUnused, noWait)
{} {}

View File

@ -95,7 +95,7 @@ public:
* @param arguments * @param arguments
*/ */
ExchangeUnbindFrame(uint16_t channel, const std::string &destination, const std::string &source, const std::string &routingKey, bool noWait, const Table &arguments) : ExchangeUnbindFrame(uint16_t channel, const std::string &destination, const std::string &source, const std::string &routingKey, bool noWait, const Table &arguments) :
ExchangeFrame(channel, (destination.length() + source.length() + routingKey.length() + arguments.size() + 6)), // 1 for each string, 1 for booleanset, 2 for deprecated field ExchangeFrame(channel, (uint32_t)(destination.length() + source.length() + routingKey.length() + arguments.size() + 6)), // 1 for each string, 1 for booleanset, 2 for deprecated field
_destination(destination), _destination(destination),
_source(source), _source(source),
_routingKey(routingKey), _routingKey(routingKey),

View File

@ -19,8 +19,8 @@
/** /**
* Dependencies * Dependencies
*/ */
#include "../include/frame.h" #include "amqpcpp/frame.h"
#include "../include/receivedframe.h" #include "amqpcpp/receivedframe.h"
/** /**
* Set up namespace * Set up namespace

View File

@ -50,7 +50,7 @@ public:
virtual ~FrameCheck() virtual ~FrameCheck()
{ {
// update the number of bytes to skip // update the number of bytes to skip
_frame->_skip += _size; _frame->_skip += (uint32_t)_size;
} }
}; };

View File

@ -9,7 +9,7 @@
// c and c++ dependencies // c and c++ dependencies
#include <stdlib.h> #include <stdlib.h>
#include <string.h> #include <string.h> // TODO cstring
#include <stdint.h> #include <stdint.h>
#include <string> #include <string>
#include <memory> #include <memory>
@ -21,67 +21,69 @@
#include <unordered_map> #include <unordered_map>
#include <vector> #include <vector>
#include <queue> #include <queue>
#include <sys/types.h>
#include <sys/socket.h> #include <sys/types.h> // TODO is this needed
#include <netdb.h>
#include <unistd.h>
#include <netinet/tcp.h>
#include <functional> #include <functional>
#include <stdexcept> #include <stdexcept>
// TODO make this nice
#ifdef _MSC_VER
//not #if defined(_WIN32) || defined(_WIN64) because we have strncasecmp in mingw
#define strncasecmp _strnicmp
#define strcasecmp _stricmp
#endif
// forward declarations // forward declarations
#include "../include/classes.h" #include "amqpcpp/classes.h"
// utility classes // utility classes
#include "../include/endian.h" #include "amqpcpp/endian.h"
#include "../include/buffer.h" #include "amqpcpp/buffer.h"
#include "../include/bytebuffer.h" #include "amqpcpp/bytebuffer.h"
#include "../include/receivedframe.h" #include "amqpcpp/receivedframe.h"
#include "../include/outbuffer.h" #include "amqpcpp/outbuffer.h"
#include "../include/copiedbuffer.h" #include "amqpcpp/copiedbuffer.h"
#include "../include/watchable.h" #include "amqpcpp/watchable.h"
#include "../include/monitor.h" #include "amqpcpp/monitor.h"
#include "../include/tcpdefines.h"
// amqp types // amqp types
#include "../include/field.h" #include "amqpcpp/field.h"
#include "../include/numericfield.h" #include "amqpcpp/numericfield.h"
#include "../include/decimalfield.h" #include "amqpcpp/decimalfield.h"
#include "../include/stringfield.h" #include "amqpcpp/stringfield.h"
#include "../include/booleanset.h" #include "amqpcpp/booleanset.h"
#include "../include/fieldproxy.h" #include "amqpcpp/fieldproxy.h"
#include "../include/table.h" #include "amqpcpp/table.h"
#include "../include/array.h" #include "amqpcpp/array.h"
// envelope for publishing and consuming // envelope for publishing and consuming
#include "../include/metadata.h" #include "amqpcpp/metadata.h"
#include "../include/envelope.h" #include "amqpcpp/envelope.h"
#include "../include/message.h" #include "amqpcpp/message.h"
// mid level includes // mid level includes
#include "../include/exchangetype.h" #include "amqpcpp/exchangetype.h"
#include "../include/flags.h" #include "amqpcpp/flags.h"
#include "../include/callbacks.h" #include "amqpcpp/callbacks.h"
#include "../include/deferred.h" #include "amqpcpp/deferred.h"
#include "../include/deferredconsumer.h" #include "amqpcpp/deferredconsumer.h"
#include "../include/deferredqueue.h" #include "amqpcpp/deferredqueue.h"
#include "../include/deferreddelete.h" #include "amqpcpp/deferreddelete.h"
#include "../include/deferredcancel.h" #include "amqpcpp/deferredcancel.h"
#include "../include/deferredget.h" #include "amqpcpp/deferredget.h"
#include "../include/channelimpl.h" #include "amqpcpp/channelimpl.h"
#include "../include/channel.h" #include "amqpcpp/channel.h"
#include "../include/login.h" #include "amqpcpp/login.h"
#include "../include/address.h" #include "amqpcpp/address.h"
#include "../include/connectionhandler.h" #include "amqpcpp/connectionhandler.h"
#include "../include/connectionimpl.h" #include "amqpcpp/connectionimpl.h"
#include "../include/connection.h" #include "amqpcpp/connection.h"
#include "../include/tcphandler.h"
#include "../include/tcpconnection.h"
// classes that are very commonly used // classes that are very commonly used
#include "../include/exception.h" #include "amqpcpp/exception.h"
#include "../include/protocolexception.h" #include "amqpcpp/protocolexception.h"
#include "../include/frame.h" #include "amqpcpp/frame.h"
#include "extframe.h" #include "extframe.h"
#include "methodframe.h" #include "methodframe.h"
#include "headerframe.h" #include "headerframe.h"
@ -91,6 +93,5 @@
#include "queueframe.h" #include "queueframe.h"
#include "basicframe.h" #include "basicframe.h"
#include "transactionframe.h" #include "transactionframe.h"
#include "addressinfo.h"

View File

@ -0,0 +1,12 @@
add_sources(
addressinfo.h
includes.h
pipe.h
tcpclosed.h
tcpconnected.h
tcpconnection.cpp
tcpinbuffer.h
tcpoutbuffer.h
tcpresolver.h
tcpstate.h
)

Some files were not shown because too many files have changed in this diff Show More