diff --git a/.gitignore b/.gitignore index 4a62ba7..3e127e5 100644 --- a/.gitignore +++ b/.gitignore @@ -13,3 +13,8 @@ *.la *.a *.a.* +/build +/.vscode +.atom-build.cson +.atom-dbg.cson +/bin \ No newline at end of file diff --git a/.travis.yml b/.travis.yml new file mode 100644 index 0000000..d3be12c --- /dev/null +++ b/.travis.yml @@ -0,0 +1,91 @@ +################ +# Config +################ + +# C++ project +language: cpp + +dist: trusty +sudo: required +group: edge + + +################ +# Services +################ + +services: + - docker + +################ +# Build matrix +################ + +matrix: + include: + + ################ + # Linux / GCC + ################ + + - os: linux + env: + - COMPILER_PACKAGE=g++-6 + - C_COMPILER=gcc-6 + - CXX_COMPILER=g++-6 + + - os: linux + compiler: gcc + env: + - COMPILER_PACKAGE=g++-7 + - C_COMPILER=gcc-7 + - CXX_COMPILER=g++-7 + + ################ + # Linux / Clang + ################ + + - os: linux + env: + - COMPILER_PACKAGE=clang-4.0 + - C_COMPILER=clang-4.0 + - CXX_COMPILER=clang++-4.0 + + - os: linux + env: + - COMPILER_PACKAGE=clang-5.0 + - C_COMPILER=clang-5.0 + - CXX_COMPILER=clang++-5.0 + +before_install: + + # Show OS/compiler version (this may not be the same OS as the Docker container) + - uname -a + + # Use an artful container - gives us access to latest compilers. + - docker run -d --name ubuntu-test-container -v $(pwd):/travis ubuntu:artful tail -f /dev/null + - docker ps + + +install: + + # Create our container + - docker exec -t ubuntu-test-container bash -c "apt-get update -y && + apt-get --no-install-recommends install -y software-properties-common cmake + ninja-build libboost-all-dev libev-dev libuv1-dev ninja-build $COMPILER_PACKAGE && + apt-get -y clean && rm -rf /var/lib/apt/lists/*" + +################ +# Build / Test +################ + +script: + + # Run the container that we created and build the code + - docker exec -t ubuntu-test-container bash -c "cd /travis && + export CC=/usr/bin/$C_COMPILER && + export CXX=/usr/bin/$CXX_COMPILER && + mkdir build.release && cd build.release && + cmake .. ${CMAKE_OPTIONS} -DAMQP-CPP_BUILD_EXAMPLES=ON -DAMQP-CPP_LINUX_TCP=ON -GNinja && + cmake --build . --config Release && + cd .." diff --git a/CMakeLists.txt b/CMakeLists.txt index 3c73267..9bcb914 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -1,11 +1,36 @@ -cmake_minimum_required(VERSION 2.8) +# Builds AMQP-CPP +# +# Options: +# +# - AMQP-CPP_BUILD_SHARED (default OFF) +# ON: Build shared lib +# OFF: Build static lib +# +# - AMQP-CPP_LINUX_TCP (default OFF) +# ON: Build posix handler implementation +# OFF: Don't build posix handler implementation +cmake_minimum_required(VERSION 3.1) + +# project name project(amqpcpp) -# ensure c++11 on all compilers -include(set_cxx_norm.cmake) -set_cxx_norm (${CXX_NORM_CXX11}) +# build options +option(AMQP-CPP_BUILD_SHARED "Build shared library. If off, build will be static." OFF) +option(AMQP-CPP_LINUX_TCP "Build linux sockets implementation." OFF) +option(AMQP-CPP_BUILD_EXAMPLES "Build amqpcpp examples" OFF) +# ensure c++11 on all compilers +set (CMAKE_CXX_STANDARD 11) + +# add source files +# ------------------------------------------------------------------------------------------------------ + +# set include/ as include directory +include_directories(${CMAKE_SOURCE_DIR}/include) + +# macro that adds a list of provided source files to a list called SRCS. +# if variable SRCS does not yet exist, it is created. macro (add_sources) file (RELATIVE_PATH _relPath "${PROJECT_SOURCE_DIR}" "${CMAKE_CURRENT_SOURCE_DIR}") foreach (_src ${ARGN}) @@ -21,36 +46,59 @@ macro (add_sources) endif() endmacro() +# add source files add_subdirectory(src) -add_subdirectory(include) - -option(BUILD_SHARED "build shared library" OFF) - -if(BUILD_SHARED) - add_library(amqpcpp SHARED ${SRCS}) - set_target_properties(amqpcpp PROPERTIES SOVERSION 2.7) - install(TARGETS amqpcpp - LIBRARY DESTINATION lib - ) -else() - add_library(amqpcpp STATIC ${SRCS}) - install(TARGETS amqpcpp - ARCHIVE DESTINATION lib - ) -endif() -Include_directories(${PROJECT_SOURCE_DIR}) -install(DIRECTORY include/ DESTINATION include/amqpcpp - FILES_MATCHING PATTERN "*.h") -install(FILES amqpcpp.h DESTINATION include) - -option(BUILD_TUTORIALS "build rabbitmq tutorials" OFF) -if(BUILD_TUTORIALS) -# add_subdirectory(examples/rabbitmq_tutorials) +if(AMQP-CPP_LINUX_TCP) + add_subdirectory(src/linux_tcp) endif() -set(AMQP-CPP_INCLUDE_PATH ${CMAKE_CURRENT_SOURCE_DIR}) -set(AMQP-CPP_INCLUDE_PATH ${CMAKE_CURRENT_SOURCE_DIR} PARENT_SCOPE) +# potentially build the examples +if(AMQP-CPP_BUILD_EXAMPLES) + add_subdirectory(examples) +endif() +# settings for specific compilers +# ------------------------------------------------------------------------------------------------------ + +# we have to prevent windows from defining the max macro. if (WIN32) - add_definitions(-DNOMINMAX -DWIN32_LEAN_AND_MEAN) + add_definitions(-DNOMINMAX) endif() + +# build targets +# ------------------------------------------------------------------------------------------------------ + +# set output directory +set(LIBRARY_OUTPUT_PATH ${CMAKE_SOURCE_DIR}/bin) + +if(AMQP-CPP_BUILD_SHARED) + # create shared lib + add_library(${PROJECT_NAME} SHARED ${SRCS}) + # set shared lib version + set_target_properties(${PROJECT_NAME} PROPERTIES SOVERSION 2.8) +else() + # create static lib + add_library(${PROJECT_NAME} STATIC ${SRCS}) +endif() + +# install rules +# ------------------------------------------------------------------------------------------------------ + +if(AMQP-CPP_BUILD_SHARED) + # copy shared lib and its static counter part + install(TARGETS ${PROJECT_NAME} + ARCHIVE DESTINATION lib + LIBRARY DESTINATION lib + RUNTIME DESTINATION lib + ) +else() + # copy static lib + install(TARGETS ${PROJECT_NAME} + ARCHIVE DESTINATION lib + ) +endif() + +# copy header files +install(DIRECTORY include/amqpcpp/ DESTINATION include/amqpcpp + FILES_MATCHING PATTERN "*.h") +install(FILES include/amqpcpp.h DESTINATION include) diff --git a/Makefile b/Makefile index 9f1fe75..9af8712 100644 --- a/Makefile +++ b/Makefile @@ -25,9 +25,11 @@ clean: install: mkdir -p ${INCLUDE_DIR}/$(LIBRARY_NAME) + mkdir -p ${INCLUDE_DIR}/$(LIBRARY_NAME)/linux_tcp mkdir -p ${LIBRARY_DIR} - cp -f $(LIBRARY_NAME).h ${INCLUDE_DIR} - cp -f include/*.h ${INCLUDE_DIR}/$(LIBRARY_NAME) + cp -f include/$(LIBRARY_NAME).h ${INCLUDE_DIR} + cp -f include/amqpcpp/*.h ${INCLUDE_DIR}/$(LIBRARY_NAME) + cp -f include/amqpcpp/linux_tcp/*.h ${INCLUDE_DIR}/$(LIBRARY_NAME)/linux_tcp -cp -f src/lib$(LIBRARY_NAME).so.$(VERSION) ${LIBRARY_DIR} -cp -f src/lib$(LIBRARY_NAME).a.$(VERSION) ${LIBRARY_DIR} ln -r -s -f $(LIBRARY_DIR)/lib$(LIBRARY_NAME).so.$(VERSION) $(LIBRARY_DIR)/lib$(LIBRARY_NAME).so.$(SONAME) diff --git a/README.md b/README.md index 8fc0303..ef0178e 100644 --- a/README.md +++ b/README.md @@ -1,37 +1,39 @@ AMQP-CPP ======== +[![Build Status](https://travis-ci.org/CopernicaMarketingSoftware/AMQP-CPP.svg?branch=master)](https://travis-ci.org/CopernicaMarketingSoftware/AMQP-CPP) + AMQP-CPP is a C++ library for communicating with a RabbitMQ message broker. The library can be used to parse incoming data from a RabbitMQ server, and to generate frames that can be sent to a RabbitMQ server. -This library has a layered architecture, and allows you - if you like - to -completely take care of the network layer. If you want to set up and manage the +This library has a layered architecture, and allows you - if you like - to +completely take care of the network layer. If you want to set up and manage the network connections yourself, the AMQP-CPP library will not make a connection to -RabbitMQ by itself, nor will it create sockets and/or perform IO operations. As -a user of this library, you create the socket connection and implement a certain -interface that you pass to the AMQP-CPP library and that the library will use +RabbitMQ by itself, nor will it create sockets and/or perform IO operations. As +a user of this library, you create the socket connection and implement a certain +interface that you pass to the AMQP-CPP library and that the library will use for IO operations. -Intercepting this network layer is however optional, the AMQP-CPP library also -comes with a predefined Tcp module that can be used if you trust the AMQP library +Intercepting this network layer is however optional, the AMQP-CPP library also +comes with a predefined Tcp module that can be used if you trust the AMQP library to take care of the network handling. In that case, the AMQP-CPP library does all the system calls to set up network connections and send and receive the data. -This layered architecture makes the library extremely flexible and portable: it -does not necessarily rely on operating system specific IO calls, and can be +This layered architecture makes the library extremely flexible and portable: it +does not necessarily rely on operating system specific IO calls, and can be easily integrated into any kind of event loop. If you want to implement the AMQP -protocol on top of some [unusual other communication layer](https://tools.ietf.org/html/rfc1149), -this library can be used for that - but if you want to use it with regular TCP -connections, setting it up is just as easy. +protocol on top of some [unusual other communication layer](https://tools.ietf.org/html/rfc1149), +this library can be used for that - but if you want to use it with regular TCP +connections, setting it up is just as easy. -AMQP-CPP is fully asynchronous and does not do any blocking (system) calls, so +AMQP-CPP is fully asynchronous and does not do any blocking (system) calls, so it can be used in high performance applications without the need for threads. -The AMQP-CPP library uses C++11 features, so if you intend use it, please make +The AMQP-CPP library uses C++11 features, so if you intend use it, please make sure that your compiler is up-to-date and supports C++11. -**Note for the reader:** This readme file has a peculiar structure. We start +**Note for the reader:** This readme file has a peculiar structure. We start explaining the pure and hard core low level interface in which you have to take care of opening socket connections yourself. In reality, you probably want to use the simpler TCP interface that is being described [later on](#tcp-connections). @@ -41,13 +43,13 @@ ABOUT ===== This library is created and maintained by Copernica (www.copernica.com), and is -used inside the MailerQ (www.mailerq.com), Yothalot (www.yothalot.com) and -AMQPipe (www.amqpipe.com) applications. MailerQ is a tool for sending large +used inside the MailerQ (www.mailerq.com), Yothalot (www.yothalot.com) and +AMQPipe (www.amqpipe.com) applications. MailerQ is a tool for sending large volumes of email, using AMQP message queues, Yothalot is a big data processing -map/reduce framework and AMQPipe is a tool for high-speed processing messages +map/reduce framework and AMQPipe is a tool for high-speed processing messages between AMQP pipes -Do you appreciate our work and are you looking for high quality email solutions? +Do you appreciate our work and are you looking for high quality email solutions? Then check out our other commercial and open source solutions: * Copernica Marketing Suite (www.copernica.com) @@ -62,24 +64,45 @@ Then check out our other commercial and open source solutions: INSTALLING ========== +AMQP-CPP comes with an optional Linux-only TCP module that takes care of the network part required for the AMQP-CPP core library. If you use this module, you are required to link with `pthread`. -If you are on some kind of Linux environment, installing the library is as easy +There are two methods to compile AMQP-CPP: CMake and Make. CMake is platform portable, but the Makefile only works on Linux. Building of a shared library is currently not supported on Windows. + +After building there are two relevant files to include when using the library. + +File|Include when? +----|------------ +amqpcpp.h|Always +amqpcpp/linux_tcp.h|If using the Linux-only TCP module + +On Windows you are required to define `NOMINMAX` when compiling code that includes public AMQP-CPP header files. + +## CMake +The CMake file supports both building and installing. You can choose not to use the install functionality, and instead manually use the build output at `bin/`. Keep in mind that the TCP module is only supported for Linux. An example install method would be: +``` bash +mkdir build +cd build +cmake .. [-DBUILD_SHARED] [-DLINUX_TCP] +cmake --build .. --target install +``` + +Option|Default|Meaning +------|-------|------- +BUILD_SHARED|OFF|Static lib(ON) or shared lib(OFF)? Shared is not supported on Windows. +LINUX_TCP|OFF|Should the Linux-only TCP module be built? + +## Make +Installing the library is as easy as running `make` and `make install`. This will install the full version of the AMQP-CPP, including the system specific TCP module. If you do not need the -additional TCP module (because you take care of handling the network stuff -yourself), you can also compile a pure form of the library. Use `make pure` +additional TCP module (because you take care of handling the network stuff +yourself), you can also compile a pure form of the library. Use `make pure` and `make install` for that. -For users on a non-Linux environment: this library is known to work on your -environment too (after all, it does not do any operating specific system calls), -but it might take some extra effort to get your compiler to compile it. Please -send in your pull requests once you have it running, so that others can benefit -from your experiences. - When you compile an application that uses the AMQP-CPP library, do not forget to link with the library. For gcc and clang the linker flag is -lamqpcpp. -If you use the fullblown version of AMQP-CPP (with the TCP module), you also -need to pass a -lpthread linker flag, because the TCP module uses a thread +If you use the fullblown version of AMQP-CPP (with the TCP module), you also +need to pass a -lpthread linker flag, because the TCP module uses a thread for running an asynchronous and non-blocking DNS hostname lookup. @@ -87,11 +110,11 @@ HOW TO USE AMQP-CPP =================== As we mentioned above, the library can be used in a network-agnostic fashion. -It then does not do any IO by itself, and you need to pass an object to the +It then does not do any IO by itself, and you need to pass an object to the library that the library can use for IO. So, before you start using the -library, you first you need to create a class that extends from the -ConnectionHandler base class. This is a class with a number of methods that are -called by the library every time it wants to send out data, or when it needs to +library, you first you need to create a class that extends from the +ConnectionHandler base class. This is a class with a number of methods that are +called by the library every time it wants to send out data, or when it needs to inform you that an error occured. ````c++ @@ -204,16 +227,16 @@ example call the "send()" or "write()" system call to send out the data to the RabbitMQ server. But what about data in the other direction? How does the library receive data back from RabbitMQ? -In this raw setup, the AMQP-CPP library does not do any IO by itself and it is -therefore also not possible for the library to receive data from a -socket. It is again up to you to do this. If, for example, you notice in your -event loop that the socket that is connected with the RabbitMQ server becomes -readable, you should read out that socket (for example by using the recv() system -call), and pass the received bytes to the AMQP-CPP library. This is done by +In this raw setup, the AMQP-CPP library does not do any IO by itself and it is +therefore also not possible for the library to receive data from a +socket. It is again up to you to do this. If, for example, you notice in your +event loop that the socket that is connected with the RabbitMQ server becomes +readable, you should read out that socket (for example by using the recv() system +call), and pass the received bytes to the AMQP-CPP library. This is done by calling the parse() method in the Connection object. The Connection::parse() method gets two parameters, a pointer to a buffer of -data that you just read from the socket, and a parameter that holds the size of +data that you just read from the socket, and a parameter that holds the size of this buffer. The code snippet below comes from the Connection.h C++ header file. ````c++ @@ -249,9 +272,9 @@ both the old data, and the new data. To optimize your calls to the parse() method, you _could_ use the Connection::expected() and Connection::maxFrame() methods. The expected() method returns the number of bytes that the library prefers to receive next. It is pointless to call the parse() method -with a smaller buffer, and it is best to call the method with a buffer of exactly this -size. The maxFrame() returns the max frame size for AMQP messages. If you read your -messages into a reusable buffer, you could allocate this buffer up to this size, so that +with a smaller buffer, and it is best to call the method with a buffer of exactly this +size. The maxFrame() returns the max frame size for AMQP messages. If you read your +messages into a reusable buffer, you could allocate this buffer up to this size, so that you never will have to reallocate. @@ -259,17 +282,17 @@ TCP CONNECTIONS =============== Although the AMQP-CPP library gives you extreme flexibility by letting you setup -your own network connections, the reality is that most if not all AMQP connections -use the TCP protocol. To help you out, the library therefore also comes with a +your own network connections, the reality is that most if not all AMQP connections +use the TCP protocol. To help you out, the library therefore also comes with a TCP module that takes care of setting up the network connections, and sending -and receiving the data. +and receiving the data. If you want to use this TCP module, you should not use the AMQP::Connection and AMQP::Channel classes that you saw above, but the alternative AMQP::TcpConnection and AMQP::TcpChannel classes instead. You also do not have to create your own class -that implements the "AMQP::ConnectionHandler" interface - but a class that inherits -from "AMQP::TcpHandler" instead. You especially need to implement the "monitor()" -method in that class, as that is needed by the AMQP-CPP library to interact with +that implements the "AMQP::ConnectionHandler" interface - but a class that inherits +from "AMQP::TcpHandler" instead. You especially need to implement the "monitor()" +method in that class, as that is needed by the AMQP-CPP library to interact with the main event loop: ````c++ @@ -334,7 +357,7 @@ class MyTcpHandler : public AMQP::TcpHandler // descriptor to the main application event loop (like the select() or // poll() loop). When the event loop reports that the descriptor becomes // readable and/or writable, it is up to you to inform the AMQP-CPP - // library that the filedescriptor is active by calling the + // library that the filedescriptor is active by calling the // connection->process(fd, flags) method. } }; @@ -344,8 +367,8 @@ The "monitor()" method can be used to integrate the AMQP filedescriptors in your application's event loop. For some popular event loops (libev, libevent), we have already added example handler objects (see the next section for that). -Using the TCP module of the AMQP-CPP library is easier than using the -raw AMQP::Connection and AMQP::Channel objects, because you do not have to +Using the TCP module of the AMQP-CPP library is easier than using the +raw AMQP::Connection and AMQP::Channel objects, because you do not have to create the sockets and connections yourself, and you also do not have to take care of buffering network data. @@ -376,9 +399,9 @@ EXISTING EVENT LOOPS Both the pure AMQP::Connection as well as the easier AMQP::TcpConnection class allow you to integrate AMQP-CPP in your own event loop. Whether you take care -of running the event loop yourself (for example by using the select() system -call), or if you use an existing library for it (like libevent, libev or libuv), -you can implement the "monitor()" method to watch the file descriptors and +of running the event loop yourself (for example by using the select() system +call), or if you use an existing library for it (like libevent, libev or libuv), +you can implement the "monitor()" method to watch the file descriptors and hand over control back to AMQP-CPP when one of the sockets become active. For libev and libevent users, we have even implemented an example implementation, @@ -394,26 +417,26 @@ int main() { // access to the event loop auto *loop = EV_DEFAULT; - + // handler for libev (so we don't have to implement AMQP::TcpHandler!) AMQP::LibEvHandler handler(loop); - + // make a connection AMQP::TcpConnection connection(&handler, AMQP::Address("amqp://localhost/")); - + // we need a channel too AMQP::TcpChannel channel(&connection); - + // create a temporary queue channel.declareQueue(AMQP::exclusive).onSuccess([&connection](const std::string &name, uint32_t messagecount, uint32_t consumercount) { - + // report the name of the temporary queue std::cout << "declared queue " << name << std::endl; - + // now we can close the connection connection.close(); }); - + // run the loop ev_run(loop, 0); @@ -426,7 +449,7 @@ The AMQP::LibEvHandler and AMQP::LibEventHandler classes are extended AMQP::TcpH classes, with an implementation of the monitor() method that simply adds the filedescriptor to the event loop. If you use this class however, it is recommended not to instantiate it directly (like we did in the example), but to create your own -"MyHandler" class that extends from it, and in which you also implement the +"MyHandler" class that extends from it, and in which you also implement the onError() method to report possible connection errors to your end users. Currently, we have example TcpHandler implementations for libev, @@ -443,15 +466,15 @@ such examples. HEARTBEATS ========== -The AMQP protocol supports *heartbeats*. If this heartbeat feature is enabled, the -client and the server negotiate a heartbeat interval during connection setup, and -they agree to send at least *some kind of data* over the connection during every -iteration of that interval. The normal data that is sent over the connection (like -publishing or consuming messages) is normally sufficient to keep the connection alive, +The AMQP protocol supports *heartbeats*. If this heartbeat feature is enabled, the +client and the server negotiate a heartbeat interval during connection setup, and +they agree to send at least *some kind of data* over the connection during every +iteration of that interval. The normal data that is sent over the connection (like +publishing or consuming messages) is normally sufficient to keep the connection alive, but if the client or server was idle during the negotiated interval time, a dummy heartbeat message must be sent instead. -The default behavior of the AMQP-CPP library is to disable heartbeats. The +The default behavior of the AMQP-CPP library is to disable heartbeats. The proposed heartbeat interval of the server during connection setup (the server normally suggests an interval of 60 seconds) is vetoed by the AMQP-CPP library so no heartbeats are ever needed to be sent over the connection. This means that you @@ -460,7 +483,7 @@ lasting algorithms after you've consumed a message from RabbitMQ, without having to worry about the connection being idle for too long. You can however choose to enable these heartbeats. If you want to enable heartbeats, -simple implement the onNegotiate() method inside your ConnectionHandler or +simple implement the onNegotiate() method inside your ConnectionHandler or TcpHandler class and have it return the interval that you find appropriate. ````c++ @@ -478,14 +501,14 @@ class MyTcpHandler : public AMQP::TcpHandler */ virtual void onNegotiate(AMQP::TcpConnection *connection, uint16_t interval) { - // we accept the suggestion from the server, but if the interval is smaller + // we accept the suggestion from the server, but if the interval is smaller // that one minute, we will use a one minute interval instead if (interval < 60) interval = 60; // @todo // set a timer in your event loop, and make sure that you call // connection->heartbeat() every _interval_ seconds. - + // return the interval that we want to use return interval; } @@ -503,7 +526,7 @@ In the libev event loop implementation the heartbeats are enabled by default. CHANNELS ======== -In the above example we created a channel object. A channel is a sort of virtual +In the above example we created a channel object. A channel is a sort of virtual connection, and it is possible to create many channels that all use the same connection. @@ -517,8 +540,8 @@ documented. All operations that you can perform on a channel are non-blocking. This means that it is not possible for a method (like Channel::declareExchange()) to -immediately return 'true' or 'false'. Instead, almost every method of the Channel -class returns an instance of the 'Deferred' class. This 'Deferred' object can be +immediately return 'true' or 'false'. Instead, almost every method of the Channel +class returns an instance of the 'Deferred' class. This 'Deferred' object can be used to install handlers that will be called in case of success or failure. For example, if you call the channel.declareExchange() method, the AMQP-CPP library @@ -610,7 +633,7 @@ channel object right after it was constructed. CHANNEL ERRORS ============== -It is important to realize that any error that occurs on a channel, +It is important to realize that any error that occurs on a channel, invalidates the entire channel, including all subsequent instructions that were already sent over it. This means that if you call multiple methods in a row, and the first method fails, all subsequent methods will not be executed either: @@ -642,7 +665,7 @@ requires and extra instruction to be sent to the RabbitMQ server, so some extra bytes are sent over the network, and some additional resources in both the client application and the RabbitMQ server are used (although this is all very limited). -If possible, it is best to make use of this feature. For example, if you have an important AMQP +If possible, it is best to make use of this feature. For example, if you have an important AMQP connection that you use for consuming messages, and at the same time you want to send another instruction to RabbitMQ (like declaring a temporary queue), it is best to set up a new channel for this 'declare' instruction. If the declare fails, @@ -657,7 +680,7 @@ void myDeclareMethod(AMQP::Connection *connection) { // create temporary channel to declare a queue AMQP::Channel channel(connection); - + // declare the queue (the channel object is destructed before the // instruction reaches the server, but the AMQP-CPP library can deal // with this) @@ -840,8 +863,8 @@ channel.commitTransaction() }); ```` -Note that AMQP transactions are not as powerful as transactions that are -knows in the database world. It is not possible to wrap all sort of +Note that AMQP transactions are not as powerful as transactions that are +knows in the database world. It is not possible to wrap all sort of operations in a transaction, they are only meaningful for publishing and consuming. @@ -992,4 +1015,3 @@ class that can be directly plugged into libev, libevent and libuv event loops. For performance reasons, we need to investigate if we can limit the number of times an incoming or outgoing messages is copied. - diff --git a/amqpcpp.h b/amqpcpp.h deleted file mode 100644 index a1ac8d5..0000000 --- a/amqpcpp.h +++ /dev/null @@ -1,77 +0,0 @@ -/** - * AMQP.h - * - * Starting point for all includes of the Copernica AMQP library - * - * @documentation public - */ - -#pragma once - -// base C++ include files -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include - -// base C include files -#include -#include - -// forward declarations -#include - -// utility classes -#include -#include -#include -#include -#include -#include -#include - -// amqp types -#include -#include -#include -#include -#include -#include -#include -#include - -// envelope for publishing and consuming -#include -#include -#include - -// mid level includes -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include diff --git a/appveyor.yml b/appveyor.yml new file mode 100644 index 0000000..5fa95c7 --- /dev/null +++ b/appveyor.yml @@ -0,0 +1,19 @@ +version: '1.0.{build}' + +image: Visual Studio 2017 + +platform: + - x64 + +configuration: + - Release + - Debug + +install: + - git submodule update --init --recursive + +before_build: + - cmake -G "Visual Studio 15 2017 Win64" . + +build: + project: $(APPVEYOR_BUILD_FOLDER)\amqpcpp.sln \ No newline at end of file diff --git a/examples/CMakeLists.txt b/examples/CMakeLists.txt new file mode 100644 index 0000000..beee70e --- /dev/null +++ b/examples/CMakeLists.txt @@ -0,0 +1,32 @@ + + +################################### +# Boost +################################### + +add_executable(amqpcpp_boost_example libboostasio.cpp) + +add_dependencies(amqpcpp_boost_example amqpcpp) + +target_link_libraries(amqpcpp_boost_example amqpcpp boost_system pthread) + +################################### +# Libev +################################### + +add_executable(amqpcpp_libev_example libev.cpp) + +add_dependencies(amqpcpp_libev_example amqpcpp) + +target_link_libraries(amqpcpp_libev_example amqpcpp ev pthread) + + +################################### +# Libuv +################################### + +add_executable(amqpcpp_libuv_example libuv.cpp) + +add_dependencies(amqpcpp_libuv_example amqpcpp) + +target_link_libraries(amqpcpp_libuv_example amqpcpp uv pthread) \ No newline at end of file diff --git a/tests/libboostasio.cpp b/examples/libboostasio.cpp similarity index 100% rename from tests/libboostasio.cpp rename to examples/libboostasio.cpp diff --git a/tests/libev.cpp b/examples/libev.cpp similarity index 100% rename from tests/libev.cpp rename to examples/libev.cpp diff --git a/tests/libevent.cpp b/examples/libevent.cpp similarity index 100% rename from tests/libevent.cpp rename to examples/libevent.cpp diff --git a/tests/libuv.cpp b/examples/libuv.cpp similarity index 100% rename from tests/libuv.cpp rename to examples/libuv.cpp diff --git a/include/CMakeLists.txt b/include/CMakeLists.txt deleted file mode 100644 index 140f8f2..0000000 --- a/include/CMakeLists.txt +++ /dev/null @@ -1,54 +0,0 @@ -add_sources( -address.h -addresses.h -array.h -booleanset.h -buffer.h -bytebuffer.h -callbacks.h -channel.h -channelimpl.h -classes.h -connection.h -connectionhandler.h -connectionimpl.h -copiedbuffer.h -decimalfield.h -deferred.h -deferredcancel.h -deferredconsumer.h -deferredconsumerbase.h -deferreddelete.h -deferredget.h -deferredqueue.h -endian.h -entityimpl.h -envelope.h -exception.h -exchangetype.h -field.h -fieldproxy.h -flags.h -frame.h -libboostasio.h -libev.h -libevent.h -libuv.h -login.h -message.h -metadata.h -monitor.h -numericfield.h -outbuffer.h -protocolexception.h -receivedframe.h -stack_ptr.h -stringfield.h -table.h -tcpchannel.h -tcpconnection.h -tcpdefines.h -tcphandler.h -watchable.h - -) diff --git a/include/amqpcpp.h b/include/amqpcpp.h new file mode 100644 index 0000000..4d7735a --- /dev/null +++ b/include/amqpcpp.h @@ -0,0 +1,84 @@ +/** + * AMQP.h + * + * Starting point for all includes of the Copernica AMQP library + * + * @author Emiel Bruijntjes + * @copyright 2015 - 2018 Copernica BV + */ + +#pragma once + +// base C++ include files +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include + +// base C include files +#include +#include + +// fix strcasecmp on non linux platforms +#if (defined(_WIN16) || defined(_WIN32) || defined(_WIN64) || defined(__WINDOWS__)) && !defined(__CYGWIN__) + #define strcasecmp _stricmp +#endif + +// forward declarations +#include "amqpcpp/classes.h" + +// utility classes +#include "amqpcpp/endian.h" +#include "amqpcpp/buffer.h" +#include "amqpcpp/bytebuffer.h" +#include "amqpcpp/receivedframe.h" +#include "amqpcpp/outbuffer.h" +#include "amqpcpp/watchable.h" +#include "amqpcpp/monitor.h" + +// amqp types +#include "amqpcpp/field.h" +#include "amqpcpp/numericfield.h" +#include "amqpcpp/decimalfield.h" +#include "amqpcpp/stringfield.h" +#include "amqpcpp/booleanset.h" +#include "amqpcpp/fieldproxy.h" +#include "amqpcpp/table.h" +#include "amqpcpp/array.h" + +// envelope for publishing and consuming +#include "amqpcpp/metadata.h" +#include "amqpcpp/envelope.h" +#include "amqpcpp/message.h" + +// mid level includes +#include "amqpcpp/exchangetype.h" +#include "amqpcpp/flags.h" +#include "amqpcpp/callbacks.h" +#include "amqpcpp/deferred.h" +#include "amqpcpp/deferredconsumer.h" +#include "amqpcpp/deferredqueue.h" +#include "amqpcpp/deferreddelete.h" +#include "amqpcpp/deferredcancel.h" +#include "amqpcpp/deferredget.h" +#include "amqpcpp/channelimpl.h" +#include "amqpcpp/channel.h" +#include "amqpcpp/login.h" +#include "amqpcpp/address.h" +#include "amqpcpp/connectionhandler.h" +#include "amqpcpp/connectionimpl.h" +#include "amqpcpp/connection.h" + +// tcp level includes +#include "amqpcpp/linux_tcp.h" + diff --git a/include/address.h b/include/amqpcpp/address.h similarity index 100% rename from include/address.h rename to include/amqpcpp/address.h diff --git a/include/addresses.h b/include/amqpcpp/addresses.h similarity index 100% rename from include/addresses.h rename to include/amqpcpp/addresses.h diff --git a/include/array.h b/include/amqpcpp/array.h similarity index 100% rename from include/array.h rename to include/amqpcpp/array.h diff --git a/include/booleanset.h b/include/amqpcpp/booleanset.h similarity index 100% rename from include/booleanset.h rename to include/amqpcpp/booleanset.h diff --git a/include/buffer.h b/include/amqpcpp/buffer.h similarity index 100% rename from include/buffer.h rename to include/amqpcpp/buffer.h diff --git a/include/bytebuffer.h b/include/amqpcpp/bytebuffer.h similarity index 100% rename from include/bytebuffer.h rename to include/amqpcpp/bytebuffer.h diff --git a/include/callbacks.h b/include/amqpcpp/callbacks.h similarity index 100% rename from include/callbacks.h rename to include/amqpcpp/callbacks.h diff --git a/include/channel.h b/include/amqpcpp/channel.h similarity index 100% rename from include/channel.h rename to include/amqpcpp/channel.h diff --git a/include/channelimpl.h b/include/amqpcpp/channelimpl.h similarity index 100% rename from include/channelimpl.h rename to include/amqpcpp/channelimpl.h diff --git a/include/classes.h b/include/amqpcpp/classes.h similarity index 100% rename from include/classes.h rename to include/amqpcpp/classes.h diff --git a/include/connection.h b/include/amqpcpp/connection.h similarity index 100% rename from include/connection.h rename to include/amqpcpp/connection.h diff --git a/include/connectionhandler.h b/include/amqpcpp/connectionhandler.h similarity index 100% rename from include/connectionhandler.h rename to include/amqpcpp/connectionhandler.h diff --git a/include/connectionimpl.h b/include/amqpcpp/connectionimpl.h similarity index 100% rename from include/connectionimpl.h rename to include/amqpcpp/connectionimpl.h diff --git a/include/copiedbuffer.h b/include/amqpcpp/copiedbuffer.h similarity index 100% rename from include/copiedbuffer.h rename to include/amqpcpp/copiedbuffer.h diff --git a/include/decimalfield.h b/include/amqpcpp/decimalfield.h similarity index 100% rename from include/decimalfield.h rename to include/amqpcpp/decimalfield.h diff --git a/include/deferred.h b/include/amqpcpp/deferred.h similarity index 100% rename from include/deferred.h rename to include/amqpcpp/deferred.h diff --git a/include/deferredcancel.h b/include/amqpcpp/deferredcancel.h similarity index 100% rename from include/deferredcancel.h rename to include/amqpcpp/deferredcancel.h diff --git a/include/deferredconsumer.h b/include/amqpcpp/deferredconsumer.h similarity index 100% rename from include/deferredconsumer.h rename to include/amqpcpp/deferredconsumer.h diff --git a/include/deferredconsumerbase.h b/include/amqpcpp/deferredconsumerbase.h similarity index 100% rename from include/deferredconsumerbase.h rename to include/amqpcpp/deferredconsumerbase.h diff --git a/include/deferreddelete.h b/include/amqpcpp/deferreddelete.h similarity index 100% rename from include/deferreddelete.h rename to include/amqpcpp/deferreddelete.h diff --git a/include/deferredget.h b/include/amqpcpp/deferredget.h similarity index 100% rename from include/deferredget.h rename to include/amqpcpp/deferredget.h diff --git a/include/deferredqueue.h b/include/amqpcpp/deferredqueue.h similarity index 100% rename from include/deferredqueue.h rename to include/amqpcpp/deferredqueue.h diff --git a/include/endian.h b/include/amqpcpp/endian.h similarity index 97% rename from include/endian.h rename to include/amqpcpp/endian.h index 8662464..f1adab9 100644 --- a/include/endian.h +++ b/include/amqpcpp/endian.h @@ -72,7 +72,7 @@ #define be32toh(x) ntohl(x) #define le32toh(x) (x) -#define htobe64(x) htonll(x) +#define htobe64(x) ((1==htonl(1)) ? (x) : ((uint64_t)htonl((x) & 0xFFFFFFFF) << 32) | htonl((x) >> 32)) #define htole64(x) (x) #define be64toh(x) ntohll(x) #define le64toh(x) (x) diff --git a/include/entityimpl.h b/include/amqpcpp/entityimpl.h similarity index 100% rename from include/entityimpl.h rename to include/amqpcpp/entityimpl.h diff --git a/include/envelope.h b/include/amqpcpp/envelope.h similarity index 100% rename from include/envelope.h rename to include/amqpcpp/envelope.h diff --git a/include/exception.h b/include/amqpcpp/exception.h similarity index 100% rename from include/exception.h rename to include/amqpcpp/exception.h diff --git a/include/exchangetype.h b/include/amqpcpp/exchangetype.h similarity index 100% rename from include/exchangetype.h rename to include/amqpcpp/exchangetype.h diff --git a/include/field.h b/include/amqpcpp/field.h similarity index 100% rename from include/field.h rename to include/amqpcpp/field.h diff --git a/include/fieldproxy.h b/include/amqpcpp/fieldproxy.h similarity index 100% rename from include/fieldproxy.h rename to include/amqpcpp/fieldproxy.h diff --git a/include/flags.h b/include/amqpcpp/flags.h similarity index 100% rename from include/flags.h rename to include/amqpcpp/flags.h diff --git a/include/frame.h b/include/amqpcpp/frame.h similarity index 100% rename from include/frame.h rename to include/amqpcpp/frame.h diff --git a/include/libboostasio.h b/include/amqpcpp/libboostasio.h similarity index 64% rename from include/libboostasio.h rename to include/amqpcpp/libboostasio.h index f0d4ed4..18b2a10 100644 --- a/include/libboostasio.h +++ b/include/amqpcpp/libboostasio.h @@ -25,36 +25,16 @@ #include #include #include +#include +#include "amqpcpp/linux_tcp.h" -/////////////////////////////////////////////////////////////////// -#define STRAND_SOCKET_HANDLER(_fn) \ -[fn = _fn, strand = _strand](const boost::system::error_code &ec, \ - const std::size_t bytes_transferred) \ -{ \ - const std::shared_ptr apStrand = strand.lock(); \ - if (!apStrand) \ - { \ - fn(boost::system::errc::make_error_code(boost::system::errc::operation_canceled),std::size_t{0}); \ - return; \ - } \ - \ - apStrand->dispatch(boost::bind(fn,ec,bytes_transferred)); \ -} - -/////////////////////////////////////////////////////////////////// -#define STRAND_TIMER_HANDLER(_fn) \ -[fn = _fn, strand = _strand](const boost::system::error_code &ec) \ -{ \ - const std::shared_ptr apStrand = strand.lock(); \ - if (!apStrand) \ - { \ - fn(boost::system::errc::make_error_code(boost::system::errc::operation_canceled)); \ - return; \ - } \ - \ - apStrand->dispatch(boost::bind(fn,ec)); \ -} +// C++17 has 'weak_from_this()' support. +#if __cplusplus >= 201701L +#define PTR_FROM_THIS weak_from_this +#else +#define PTR_FROM_THIS shared_from_this +#endif /** * Set up namespace @@ -82,11 +62,13 @@ private: */ boost::asio::io_service & _ioservice; + typedef std::weak_ptr strand_weak_ptr; + /** * The boost asio io_service::strand managed pointer. * @var class std::shared_ptr */ - std::weak_ptr _strand; + strand_weak_ptr _wpstrand; /** * The boost tcp socket. @@ -120,6 +102,66 @@ private: */ bool _write_pending{false}; + using handler_cb = boost::function; + using io_handler = boost::function; + + /** + * Builds a io handler callback that executes the io callback in a strand. + * @param io_handler The handler callback to dispatch + * @return handler_cb A function wrapping the execution of the handler function in a io_service::strand. + */ + handler_cb get_dispatch_wrapper(io_handler fn) + { + const strand_weak_ptr wpstrand = _wpstrand; + + return [fn, wpstrand](const boost::system::error_code &ec, const std::size_t bytes_transferred) + { + const strand_shared_ptr strand = wpstrand.lock(); + if (!strand) + { + fn(boost::system::errc::make_error_code(boost::system::errc::operation_canceled), std::size_t{0}); + return; + } + strand->dispatch(boost::bind(fn, ec, bytes_transferred)); + }; + } + + /** + * Binds and returns a read handler for the io operation. + * @param connection The connection being watched. + * @param fd The file descripter being watched. + * @return handler callback + */ + handler_cb get_read_handler(TcpConnection *const connection, const int fd) + { + auto fn = boost::bind(&Watcher::read_handler, + this, + _1, + _2, + PTR_FROM_THIS(), + connection, + fd); + return get_dispatch_wrapper(fn); + } + + /** + * Binds and returns a read handler for the io operation. + * @param connection The connection being watched. + * @param fd The file descripter being watched. + * @return handler callback + */ + handler_cb get_write_handler(TcpConnection *const connection, const int fd) + { + auto fn = boost::bind(&Watcher::write_handler, + this, + _1, + _2, + PTR_FROM_THIS(), + connection, + fd); + return get_dispatch_wrapper(fn); + } + /** * Handler method that is called by boost's io_service when the socket pumps a read event. * @param ec The status of the callback. @@ -147,21 +189,10 @@ private: connection->process(fd, AMQP::readable); _read_pending = true; - - _socket.async_read_some(boost::asio::null_buffers(), - STRAND_SOCKET_HANDLER( - boost::bind(&Watcher::read_handler, - this, - boost::arg<1>(), - boost::arg<2>(), -// C++17 has 'weak_from_this()' support. -#if __cplusplus >= 201701L - weak_from_this(), -#else - shared_from_this(), -#endif - connection, - fd))); + + _socket.async_read_some( + boost::asio::null_buffers(), + get_read_handler(connection, fd)); } } @@ -193,20 +224,9 @@ private: _write_pending = true; - _socket.async_write_some(boost::asio::null_buffers(), - STRAND_SOCKET_HANDLER( - boost::bind(&Watcher::write_handler, - this, - boost::arg<1>(), - boost::arg<2>(), -// C++17 has 'weak_from_this()' support. -#if __cplusplus >= 201701L - weak_from_this(), -#else - shared_from_this(), -#endif - connection, - fd))); + _socket.async_write_some( + boost::asio::null_buffers(), + get_write_handler(connection, fd)); } } @@ -215,14 +235,14 @@ private: * Constructor- initialises the watcher and assigns the filedescriptor to * a boost socket for monitoring. * @param io_service The boost io_service - * @param strand A weak pointer to a io_service::strand instance. + * @param wpstrand A weak pointer to a io_service::strand instance. * @param fd The filedescriptor being watched */ Watcher(boost::asio::io_service &io_service, - const std::weak_ptr strand, + const strand_weak_ptr wpstrand, const int fd) : _ioservice(io_service), - _strand(strand), + _wpstrand(wpstrand), _socket(_ioservice) { _socket.assign(fd); @@ -262,20 +282,9 @@ private: { _read_pending = true; - _socket.async_read_some(boost::asio::null_buffers(), - STRAND_SOCKET_HANDLER( - boost::bind(&Watcher::read_handler, - this, - boost::arg<1>(), - boost::arg<2>(), -// C++17 has 'weak_from_this()' support. -#if __cplusplus >= 201701L - weak_from_this(), -#else - shared_from_this(), -#endif - connection, - fd))); + _socket.async_read_some( + boost::asio::null_buffers(), + get_read_handler(connection, fd)); } // 2. Handle writes? @@ -286,20 +295,9 @@ private: { _write_pending = true; - _socket.async_write_some(boost::asio::null_buffers(), - STRAND_SOCKET_HANDLER( - boost::bind(&Watcher::write_handler, - this, - boost::arg<1>(), - boost::arg<2>(), -// C++17 has 'weak_from_this()' support. -#if __cplusplus >= 201701L - weak_from_this(), -#else - shared_from_this(), -#endif - connection, - fd))); + _socket.async_write_some( + boost::asio::null_buffers(), + get_write_handler(connection, fd)); } } }; @@ -317,11 +315,13 @@ private: */ boost::asio::io_service & _ioservice; + typedef std::weak_ptr strand_weak_ptr; + /** * The boost asio io_service::strand managed pointer. * @var class std::shared_ptr */ - std::weak_ptr _strand; + strand_weak_ptr _wpstrand; /** * The boost asynchronous deadline timer. @@ -329,11 +329,42 @@ private: */ boost::asio::deadline_timer _timer; + using handler_fn = boost::function; + /** + * Binds and returns a lamba function handler for the io operation. + * @param connection The connection being watched. + * @param timeout The file descripter being watched. + * @return handler callback + */ + handler_fn get_handler(TcpConnection *const connection, const uint16_t timeout) + { + const auto fn = boost::bind(&Timer::timeout, + this, + _1, + PTR_FROM_THIS(), + connection, + timeout); + + const strand_weak_ptr wpstrand = _wpstrand; + + return [fn, wpstrand](const boost::system::error_code &ec) + { + const strand_shared_ptr strand = wpstrand.lock(); + if (!strand) + { + fn(boost::system::errc::make_error_code(boost::system::errc::operation_canceled)); + return; + } + strand->dispatch(boost::bind(fn, ec)); + }; + } + /** * Callback method that is called by libev when the timer expires + * @param ec error code returned from loop * @param loop The loop in which the event was triggered - * @param timer Internal timer object - * @param revents The events that triggered this call + * @param connection + * @param timeout */ void timeout(const boost::system::error_code &ec, std::weak_ptr awpThis, @@ -357,18 +388,7 @@ private: _timer.expires_at(_timer.expires_at() + boost::posix_time::seconds(timeout)); // Posts the timer event - _timer.async_wait(STRAND_TIMER_HANDLER( - boost::bind(&Timer::timeout, - this, - boost::arg<1>(), -// C++17 has 'weak_from_this()' support. -#if __cplusplus >= 201701L - weak_from_this(), -#else - shared_from_this(), -#endif - connection, - timeout))); + _timer.async_wait(get_handler(connection, timeout)); } } @@ -385,13 +405,13 @@ private: /** * Constructor * @param io_service The boost asio io_service. - * @param strand A weak pointer to a io_service::strand instance. + * @param wpstrand A weak pointer to a io_service::strand instance. */ Timer(boost::asio::io_service &io_service, - const std::weak_ptr strand) : - _ioservice(io_service), - _strand(strand), - _timer(_ioservice) + const strand_weak_ptr wpstrand) : + _ioservice(io_service), + _wpstrand(wpstrand), + _timer(_ioservice) { } @@ -423,19 +443,11 @@ private: // stop timer in case it was already set stop(); + // Reschedule the timer for the future: _timer.expires_from_now(boost::posix_time::seconds(timeout)); - _timer.async_wait(STRAND_TIMER_HANDLER( - boost::bind(&Timer::timeout, - this, - boost::arg<1>(), -// C++17 has 'weak_from_this()' support. -#if __cplusplus >= 201701L - weak_from_this(), -#else - shared_from_this(), -#endif - connection, - timeout))); + + // Posts the timer event + _timer.async_wait(get_handler(connection, timeout)); } }; @@ -445,11 +457,13 @@ private: */ boost::asio::io_service & _ioservice; + typedef std::shared_ptr strand_shared_ptr; + /** * The boost asio io_service::strand managed pointer. * @var class std::shared_ptr */ - std::shared_ptr _strand; + strand_shared_ptr _strand; /** diff --git a/include/libev.h b/include/amqpcpp/libev.h similarity index 99% rename from include/libev.h rename to include/amqpcpp/libev.h index 9e6ea1a..1fc1f32 100644 --- a/include/libev.h +++ b/include/amqpcpp/libev.h @@ -8,7 +8,7 @@ * Compile with: "g++ -std=c++11 libev.cpp -lamqpcpp -lev -lpthread" * * @author Emiel Bruijntjes - * @copyright 2015 Copernica BV + * @copyright 2015 - 2018 Copernica BV */ /** @@ -21,6 +21,8 @@ */ #include +#include "amqpcpp/linux_tcp.h" + /** * Set up namespace */ diff --git a/include/libevent.h b/include/amqpcpp/libevent.h similarity index 100% rename from include/libevent.h rename to include/amqpcpp/libevent.h diff --git a/include/libuv.h b/include/amqpcpp/libuv.h similarity index 99% rename from include/libuv.h rename to include/amqpcpp/libuv.h index d48303f..eebf204 100644 --- a/include/libuv.h +++ b/include/amqpcpp/libuv.h @@ -21,6 +21,8 @@ */ #include +#include "amqpcpp/linux_tcp.h" + /** * Set up namespace */ diff --git a/include/amqpcpp/linux_tcp.h b/include/amqpcpp/linux_tcp.h new file mode 100644 index 0000000..c8b0d70 --- /dev/null +++ b/include/amqpcpp/linux_tcp.h @@ -0,0 +1,3 @@ +#include "linux_tcp/tcphandler.h" +#include "linux_tcp/tcpconnection.h" +#include "linux_tcp/tcpchannel.h" diff --git a/include/tcpchannel.h b/include/amqpcpp/linux_tcp/tcpchannel.h similarity index 100% rename from include/tcpchannel.h rename to include/amqpcpp/linux_tcp/tcpchannel.h diff --git a/include/tcpconnection.h b/include/amqpcpp/linux_tcp/tcpconnection.h similarity index 100% rename from include/tcpconnection.h rename to include/amqpcpp/linux_tcp/tcpconnection.h diff --git a/include/tcpdefines.h b/include/amqpcpp/linux_tcp/tcpdefines.h similarity index 100% rename from include/tcpdefines.h rename to include/amqpcpp/linux_tcp/tcpdefines.h diff --git a/include/tcphandler.h b/include/amqpcpp/linux_tcp/tcphandler.h similarity index 100% rename from include/tcphandler.h rename to include/amqpcpp/linux_tcp/tcphandler.h diff --git a/include/login.h b/include/amqpcpp/login.h similarity index 100% rename from include/login.h rename to include/amqpcpp/login.h diff --git a/include/message.h b/include/amqpcpp/message.h similarity index 92% rename from include/message.h rename to include/amqpcpp/message.h index 5ce4e68..5b1bb02 100644 --- a/include/message.h +++ b/include/amqpcpp/message.h @@ -21,6 +21,7 @@ #include "envelope.h" #include #include +#include /** * Set up namespace @@ -93,10 +94,10 @@ protected: size = std::min(size, _bodySize - _filled); // append more data - memcpy(_body + _filled, buffer, size); + memcpy(_body + _filled, buffer, (size_t)size); // update filled data - _filled += size; + _filled += (size_t)size; } else if (size >= _bodySize) { @@ -107,16 +108,16 @@ protected: else { // allocate the buffer - _body = (char *)malloc(_bodySize); + _body = (char *)malloc((size_t)_bodySize); // remember that the buffer was allocated, so that the destructor can get rid of it _allocated = true; // append more data - memcpy(_body, buffer, std::min(size, _bodySize)); + memcpy(_body, buffer, std::min((size_t)size, (size_t)_bodySize)); // update filled data - _filled = std::min(size, _bodySize); + _filled = std::min((size_t)size, (size_t)_bodySize); } // check if we're done diff --git a/include/metadata.h b/include/amqpcpp/metadata.h similarity index 91% rename from include/metadata.h rename to include/amqpcpp/metadata.h index 4b1a04e..8b18d99 100644 --- a/include/metadata.h +++ b/include/amqpcpp/metadata.h @@ -302,20 +302,20 @@ public: // the result (2 for the two boolean sets) uint32_t result = 2; - if (hasExpiration()) result += _expiration.size(); - if (hasReplyTo()) result += _replyTo.size(); - if (hasCorrelationID()) result += _correlationID.size(); - if (hasPriority()) result += _priority.size(); - if (hasDeliveryMode()) result += _deliveryMode.size(); - if (hasHeaders()) result += _headers.size(); - if (hasContentEncoding()) result += _contentEncoding.size(); - if (hasContentType()) result += _contentType.size(); - if (hasClusterID()) result += _clusterID.size(); - if (hasAppID()) result += _appID.size(); - if (hasUserID()) result += _userID.size(); - if (hasTypeName()) result += _typeName.size(); - if (hasTimestamp()) result += _timestamp.size(); - if (hasMessageID()) result += _messageID.size(); + if (hasExpiration()) result += (uint32_t)_expiration.size(); + if (hasReplyTo()) result += (uint32_t)_replyTo.size(); + if (hasCorrelationID()) result += (uint32_t)_correlationID.size(); + if (hasPriority()) result += (uint32_t)_priority.size(); + if (hasDeliveryMode()) result += (uint32_t)_deliveryMode.size(); + if (hasHeaders()) result += (uint32_t)_headers.size(); + if (hasContentEncoding()) result += (uint32_t)_contentEncoding.size(); + if (hasContentType()) result += (uint32_t)_contentType.size(); + if (hasClusterID()) result += (uint32_t)_clusterID.size(); + if (hasAppID()) result += (uint32_t)_appID.size(); + if (hasUserID()) result += (uint32_t)_userID.size(); + if (hasTypeName()) result += (uint32_t)_typeName.size(); + if (hasTimestamp()) result += (uint32_t)_timestamp.size(); + if (hasMessageID()) result += (uint32_t)_messageID.size(); // done return result; diff --git a/include/monitor.h b/include/amqpcpp/monitor.h similarity index 100% rename from include/monitor.h rename to include/amqpcpp/monitor.h diff --git a/include/numericfield.h b/include/amqpcpp/numericfield.h similarity index 89% rename from include/numericfield.h rename to include/amqpcpp/numericfield.h index 29f2f42..a54c3f8 100644 --- a/include/numericfield.h +++ b/include/amqpcpp/numericfield.h @@ -1,6 +1,6 @@ /** * Numeric field types for AMQP - * + * * @copyright 2014 Copernica BV */ @@ -42,6 +42,8 @@ private: T _value; public: + using Type = T; + /** * Default constructor, assign 0 */ @@ -116,14 +118,14 @@ public: * Get the value * @return mixed */ - operator uint8_t () const override { return _value; } - operator uint16_t() const override { return _value; } - operator uint32_t() const override { return _value; } - operator uint64_t() const override { return _value; } - operator int8_t () const override { return _value; } - operator int16_t () const override { return _value; } - operator int32_t () const override { return _value; } - operator int64_t () const override { return _value; } + operator uint8_t () const override { return (uint8_t)_value; } + operator uint16_t() const override { return (uint16_t)_value; } + operator uint32_t() const override { return (uint32_t)_value; } + operator uint64_t() const override { return (uint64_t)_value; } + operator int8_t () const override { return (int8_t)_value; } + operator int16_t () const override { return (int16_t)_value; } + operator int32_t () const override { return (int32_t)_value; } + operator int64_t () const override { return (int64_t)_value; } /** * Get the value @@ -213,4 +215,3 @@ typedef NumericField Double; * end namespace */ } - diff --git a/include/outbuffer.h b/include/amqpcpp/outbuffer.h similarity index 100% rename from include/outbuffer.h rename to include/amqpcpp/outbuffer.h diff --git a/include/protocolexception.h b/include/amqpcpp/protocolexception.h similarity index 100% rename from include/protocolexception.h rename to include/amqpcpp/protocolexception.h diff --git a/include/receivedframe.h b/include/amqpcpp/receivedframe.h similarity index 100% rename from include/receivedframe.h rename to include/amqpcpp/receivedframe.h diff --git a/include/stack_ptr.h b/include/amqpcpp/stack_ptr.h similarity index 100% rename from include/stack_ptr.h rename to include/amqpcpp/stack_ptr.h diff --git a/include/stringfield.h b/include/amqpcpp/stringfield.h similarity index 97% rename from include/stringfield.h rename to include/amqpcpp/stringfield.h index bab6310..edcee29 100644 --- a/include/stringfield.h +++ b/include/amqpcpp/stringfield.h @@ -119,7 +119,7 @@ public: virtual size_t size() const override { // find out size of the size parameter - T size(_data.size()); + T size((typename T::Type)_data.size()); // size of the uint8 or uint32 + the actual string size return size.size() + _data.size(); @@ -160,7 +160,7 @@ public: virtual void fill(OutBuffer& buffer) const override { // create size - T size(_data.size()); + T size((typename T::Type)_data.size()); // first, write down the size of the string size.fill(buffer); @@ -210,4 +210,3 @@ typedef StringField LongString; * end namespace */ } - diff --git a/include/table.h b/include/amqpcpp/table.h similarity index 100% rename from include/table.h rename to include/amqpcpp/table.h diff --git a/include/watchable.h b/include/amqpcpp/watchable.h similarity index 100% rename from include/watchable.h rename to include/amqpcpp/watchable.h diff --git a/set_cxx_norm.cmake b/set_cxx_norm.cmake deleted file mode 100644 index da2af82..0000000 --- a/set_cxx_norm.cmake +++ /dev/null @@ -1,55 +0,0 @@ -# Version -cmake_minimum_required(VERSION 2.6.3) - -set(CXX_NORM_CXX98 1) # C++98 -set(CXX_NORM_CXX03 2) # C++03 -set(CXX_NORM_CXX11 3) # C++11 - -# - Set the wanted C++ norm -# Adds the good argument to the command line in function of the compiler -# Currently only works with g++ and clang++ -macro(set_cxx_norm NORM) - - # Extract c++ compiler --version output - exec_program( - ${CMAKE_CXX_COMPILER} - ARGS --version - OUTPUT_VARIABLE _compiler_output - ) - # Keep only the first line - string(REGEX REPLACE - "(\n.*$)" - "" - cxx_compiler_version "${_compiler_output}" - ) - # Extract the version number - string(REGEX REPLACE - "([^0-9.])|([0-9.][^0-9.])" - "" - cxx_compiler_version "${cxx_compiler_version}" - ) - - if(CMAKE_COMPILER_IS_GNUCXX) - - if(${NORM} EQUAL ${CXX_NORM_CXX98}) - add_definitions("-std=c++98") - elseif(${NORM} EQUAL ${CXX_NORM_CXX03}) - add_definitions("-std=c++03") - elseif(${NORM} EQUAL ${CXX_NORM_CXX11}) - if(${cxx_compiler_version} VERSION_LESS "4.7.0") - add_definitions("-std=c++0x") - else() - add_definitions("-std=c++11") - endif() - endif() - - elseif(${CMAKE_CXX_COMPILER_ID} STREQUAL "Clang") - - if(${NORM} EQUAL ${CXX_NORM_CXX11}) - add_definitions("-std=c++11") - endif() - - endif() - -endmacro() - diff --git a/src/CMakeLists.txt b/src/CMakeLists.txt index eae0c22..6cc8e1a 100644 --- a/src/CMakeLists.txt +++ b/src/CMakeLists.txt @@ -1,100 +1,91 @@ add_sources( -addressinfo.h -array.cpp -basicackframe.h -basiccancelframe.h -basiccancelokframe.h -basicconsumeframe.h -basicconsumeokframe.h -basicdeliverframe.h -basicframe.h -basicgetemptyframe.h -basicgetframe.h -basicgetokframe.h -basicheaderframe.h -basicnackframe.h -basicpublishframe.h -basicqosframe.h -basicqosokframe.h -basicrecoverasyncframe.h -basicrecoverframe.h -basicrecoverokframe.h -basicrejectframe.h -basicreturnframe.h -bodyframe.h -channelcloseframe.h -channelcloseokframe.h -channelflowframe.h -channelflowokframe.h -channelframe.h -channelimpl.cpp -channelopenframe.h -channelopenokframe.h -connectioncloseframe.h -connectioncloseokframe.h -connectionframe.h -connectionimpl.cpp -connectionopenframe.h -connectionopenokframe.h -connectionsecureframe.h -connectionsecureokframe.h -connectionstartframe.h -connectionstartokframe.h -connectiontuneframe.h -connectiontuneokframe.h -consumedmessage.h -deferredcancel.cpp -deferredconsumer.cpp -deferredconsumerbase.cpp -deferredget.cpp -exchangebindframe.h -exchangebindokframe.h -exchangedeclareframe.h -exchangedeclareokframe.h -exchangedeleteframe.h -exchangedeleteokframe.h -exchangeframe.h -exchangeunbindframe.h -exchangeunbindokframe.h -extframe.h -field.cpp -flags.cpp -framecheck.h -headerframe.h -heartbeatframe.h -includes.h -methodframe.h -passthroughbuffer.h -pipe.h -protocolheaderframe.h -queuebindframe.h -queuebindokframe.h -queuedeclareframe.h -queuedeclareokframe.h -queuedeleteframe.h -queuedeleteokframe.h -queueframe.h -queuepurgeframe.h -queuepurgeokframe.h -queueunbindframe.h -queueunbindokframe.h -receivedframe.cpp -reducedbuffer.h -returnedmessage.h -table.cpp -tcpclosed.h -tcpconnected.h -tcpconnection.cpp -tcpinbuffer.h -tcpoutbuffer.h -tcpresolver.h -tcpstate.h -transactioncommitframe.h -transactioncommitokframe.h -transactionframe.h -transactionrollbackframe.h -transactionrollbackokframe.h -transactionselectframe.h -transactionselectokframe.h -watchable.cpp + array.cpp + basicackframe.h + basiccancelframe.h + basiccancelokframe.h + basicconsumeframe.h + basicconsumeokframe.h + basicdeliverframe.h + basicframe.h + basicgetemptyframe.h + basicgetframe.h + basicgetokframe.h + basicheaderframe.h + basicnackframe.h + basicpublishframe.h + basicqosframe.h + basicqosokframe.h + basicrecoverasyncframe.h + basicrecoverframe.h + basicrecoverokframe.h + basicrejectframe.h + basicreturnframe.h + bodyframe.h + channelcloseframe.h + channelcloseokframe.h + channelflowframe.h + channelflowokframe.h + channelframe.h + channelimpl.cpp + channelopenframe.h + channelopenokframe.h + connectioncloseframe.h + connectioncloseokframe.h + connectionframe.h + connectionimpl.cpp + connectionopenframe.h + connectionopenokframe.h + connectionsecureframe.h + connectionsecureokframe.h + connectionstartframe.h + connectionstartokframe.h + connectiontuneframe.h + connectiontuneokframe.h + consumedmessage.h + deferredcancel.cpp + deferredconsumer.cpp + deferredconsumerbase.cpp + deferredget.cpp + exchangebindframe.h + exchangebindokframe.h + exchangedeclareframe.h + exchangedeclareokframe.h + exchangedeleteframe.h + exchangedeleteokframe.h + exchangeframe.h + exchangeunbindframe.h + exchangeunbindokframe.h + extframe.h + field.cpp + flags.cpp + framecheck.h + headerframe.h + heartbeatframe.h + includes.h + methodframe.h + passthroughbuffer.h + protocolheaderframe.h + queuebindframe.h + queuebindokframe.h + queuedeclareframe.h + queuedeclareokframe.h + queuedeleteframe.h + queuedeleteokframe.h + queueframe.h + queuepurgeframe.h + queuepurgeokframe.h + queueunbindframe.h + queueunbindokframe.h + receivedframe.cpp + reducedbuffer.h + returnedmessage.h + table.cpp + transactioncommitframe.h + transactioncommitokframe.h + transactionframe.h + transactionrollbackframe.h + transactionrollbackokframe.h + transactionselectframe.h + transactionselectokframe.h + watchable.cpp ) diff --git a/src/Makefile b/src/Makefile index 7448733..3471fd5 100644 --- a/src/Makefile +++ b/src/Makefile @@ -1,11 +1,11 @@ CPP = g++ RM = rm -f -CPPFLAGS = -Wall -c -I. -std=c++11 -MD +CPPFLAGS = -Wall -c -I../include -std=c++11 -MD LD = g++ LD_FLAGS = -Wall -shared SHARED_LIB = lib$(LIBRARY_NAME).so.$(VERSION) STATIC_LIB = lib$(LIBRARY_NAME).a.$(VERSION) -SOURCES = $(wildcard *.cpp) +SOURCES = $(wildcard *.cpp) $(wildcard linux_tcp/*.cpp) SHARED_OBJECTS = $(SOURCES:%.cpp=%.o) STATIC_OBJECTS = $(SOURCES:%.cpp=%.s.o) DEPENDENCIES = $(SOURCES:%.cpp=%.d) @@ -53,4 +53,3 @@ ${SHARED_OBJECTS}: ${STATIC_OBJECTS}: ${CPP} ${CPPFLAGS} -o $@ ${@:%.s.o=%.cpp} - diff --git a/src/array.cpp b/src/array.cpp index d04c865..9017faf 100644 --- a/src/array.cpp +++ b/src/array.cpp @@ -29,7 +29,7 @@ Array::Array(ReceivedFrame &frame) if (!field) continue; // less bytes to read - charsToRead -= field->size(); + charsToRead -= (uint32_t)field->size(); // add the additional field _fields.push_back(std::shared_ptr(field)); @@ -76,7 +76,7 @@ const Field &Array::get(uint8_t index) const */ uint32_t Array::count() const { - return _fields.size(); + return (uint32_t)_fields.size(); } /** diff --git a/src/basiccancelframe.h b/src/basiccancelframe.h index 6dd26dd..cdcf916 100644 --- a/src/basiccancelframe.h +++ b/src/basiccancelframe.h @@ -59,7 +59,7 @@ public: * @param noWait whether to wait for a response. */ BasicCancelFrame(uint16_t channel, const std::string& consumerTag, bool noWait = false) : - BasicFrame(channel, consumerTag.size() + 2), // 1 for extra string size, 1 for bool + BasicFrame(channel, (uint32_t)(consumerTag.size() + 2)), // 1 for extra string size, 1 for bool _consumerTag(consumerTag), _noWait(noWait) {} diff --git a/src/basiccancelokframe.h b/src/basiccancelokframe.h index 192417f..a894c5c 100644 --- a/src/basiccancelokframe.h +++ b/src/basiccancelokframe.h @@ -53,7 +53,7 @@ public: * @param consumerTag holds the consumertag specified by client or server */ BasicCancelOKFrame(uint16_t channel, std::string& consumerTag) : - BasicFrame(channel, consumerTag.length() + 1), // add 1 byte for encoding the size of consumer tag + BasicFrame(channel, (uint32_t)(consumerTag.length() + 1)), // add 1 byte for encoding the size of consumer tag _consumerTag(consumerTag) {} diff --git a/src/basicconsumeframe.h b/src/basicconsumeframe.h index 7e57eee..39a8930 100644 --- a/src/basicconsumeframe.h +++ b/src/basicconsumeframe.h @@ -86,7 +86,7 @@ public: * @param filter additional arguments */ BasicConsumeFrame(uint16_t channel, const std::string& queueName, const std::string& consumerTag, bool noLocal = false, bool noAck = false, bool exclusive = false, bool noWait = false, const Table& filter = {}) : - BasicFrame(channel, (queueName.length() + consumerTag.length() + 5 + filter.size())), // size of vars, +1 for each shortstring size, +1 for bools, +2 for deprecated value + BasicFrame(channel, (uint32_t)(queueName.length() + consumerTag.length() + 5 + filter.size())), // size of vars, +1 for each shortstring size, +1 for bools, +2 for deprecated value _queueName(queueName), _consumerTag(consumerTag), _bools(noLocal, noAck, exclusive, noWait), diff --git a/src/basicconsumeokframe.h b/src/basicconsumeokframe.h index 927fa38..9ec6397 100644 --- a/src/basicconsumeokframe.h +++ b/src/basicconsumeokframe.h @@ -43,7 +43,7 @@ public: * @param consumerTag consumertag specified by client of provided by server */ BasicConsumeOKFrame(uint16_t channel, const std::string& consumerTag) : - BasicFrame(channel, consumerTag.length() + 1), // length of string + 1 for encoding of stringsize + BasicFrame(channel, (uint32_t)(consumerTag.length() + 1)), // length of string + 1 for encoding of stringsize _consumerTag(consumerTag) {} diff --git a/src/basicdeliverframe.h b/src/basicdeliverframe.h index 1252969..4d4c408 100644 --- a/src/basicdeliverframe.h +++ b/src/basicdeliverframe.h @@ -13,9 +13,9 @@ * Dependencies */ #include "basicframe.h" -#include "../include/stringfield.h" -#include "../include/booleanset.h" -#include "../include/connectionimpl.h" +#include "amqpcpp/stringfield.h" +#include "amqpcpp/booleanset.h" +#include "amqpcpp/connectionimpl.h" /** * Set up namespace @@ -87,7 +87,7 @@ public: * @param routingKey message routing key */ BasicDeliverFrame(uint16_t channel, const std::string& consumerTag, uint64_t deliveryTag, bool redelivered = false, const std::string& exchange = "", const std::string& routingKey = "") : - BasicFrame(channel, (consumerTag.length() + exchange.length() + routingKey.length() + 12)), + BasicFrame(channel, (uint32_t)(consumerTag.length() + exchange.length() + routingKey.length() + 12)), // length of strings + 1 byte per string for stringsize, 8 bytes for uint64_t and 1 for bools _consumerTag(consumerTag), _deliveryTag(deliveryTag), diff --git a/src/basicgetframe.h b/src/basicgetframe.h index f7244c8..4d112ff 100644 --- a/src/basicgetframe.h +++ b/src/basicgetframe.h @@ -59,7 +59,7 @@ public: * @param noAck whether server expects acknowledgements for messages */ BasicGetFrame(uint16_t channel, const std::string& queue, bool noAck = false) : - BasicFrame(channel, queue.length() + 4), // 1 for bool, 1 for string size, 2 for deprecated field + BasicFrame(channel, (uint32_t)(queue.length() + 4)), // 1 for bool, 1 for string size, 2 for deprecated field _queue(queue), _noAck(noAck) {} diff --git a/src/basicgetokframe.h b/src/basicgetokframe.h index 5cc3a12..0de5ea9 100644 --- a/src/basicgetokframe.h +++ b/src/basicgetokframe.h @@ -76,7 +76,7 @@ public: * @param messageCount number of messages in the queue */ BasicGetOKFrame(uint16_t channel, uint64_t deliveryTag, bool redelivered, const std::string& exchange, const std::string& routingKey, uint32_t messageCount) : - BasicFrame(channel, (exchange.length() + routingKey.length() + 15)), // string length, +1 for each shortsrting length + 8 (uint64_t) + 4 (uint32_t) + 1 (bool) + BasicFrame(channel, (uint32_t)(exchange.length() + routingKey.length() + 15)), // string length, +1 for each shortsrting length + 8 (uint64_t) + 4 (uint32_t) + 1 (bool) _deliveryTag(deliveryTag), _redelivered(redelivered), _exchange(exchange), diff --git a/src/basicheaderframe.h b/src/basicheaderframe.h index 5260621..c6ec1c4 100644 --- a/src/basicheaderframe.h +++ b/src/basicheaderframe.h @@ -13,10 +13,10 @@ * Dependencies */ #include "headerframe.h" -#include "../include/metadata.h" -#include "../include/envelope.h" -#include "../include/connectionimpl.h" -#include "../include/deferredconsumerbase.h" +#include "amqpcpp/metadata.h" +#include "amqpcpp/envelope.h" +#include "amqpcpp/connectionimpl.h" +#include "amqpcpp/deferredconsumerbase.h" /** * Set up namespace diff --git a/src/basicpublishframe.h b/src/basicpublishframe.h index 975c2ab..bbaed57 100644 --- a/src/basicpublishframe.h +++ b/src/basicpublishframe.h @@ -70,7 +70,7 @@ public: * @param immediate request immediate delivery @default = false */ BasicPublishFrame(uint16_t channel, const std::string& exchange = "", const std::string& routingKey = "", bool mandatory = false, bool immediate = false) : - BasicFrame(channel, exchange.length() + routingKey.length() + 5), // 1 extra per string (for the size), 1 for bools, 2 for deprecated field + BasicFrame(channel, (uint32_t)(exchange.length() + routingKey.length() + 5)), // 1 extra per string (for the size), 1 for bools, 2 for deprecated field _exchange(exchange), _routingKey(routingKey), _bools(mandatory, immediate) diff --git a/src/basicreturnframe.h b/src/basicreturnframe.h index 2fedce6..e140e9c 100644 --- a/src/basicreturnframe.h +++ b/src/basicreturnframe.h @@ -67,7 +67,7 @@ public: * @param routingKey message routing key */ BasicReturnFrame(uint16_t channel, int16_t replyCode, const std::string& replyText = "", const std::string& exchange = "", const std::string& routingKey = "") : - BasicFrame(channel, replyText.length() + exchange.length() + routingKey.length() + 5), // 3 for each string (extra size byte), 2 for uint16_t + BasicFrame(channel, (uint32_t)(replyText.length() + exchange.length() + routingKey.length() + 5)), // 3 for each string (extra size byte), 2 for uint16_t _replyCode(replyCode), _replyText(replyText), _exchange(exchange), diff --git a/src/bodyframe.h b/src/bodyframe.h index ba34cd2..ae1621d 100644 --- a/src/bodyframe.h +++ b/src/bodyframe.h @@ -13,8 +13,8 @@ * Dependencies */ #include "extframe.h" -#include "../include/connectionimpl.h" -#include "../include/deferredconsumerbase.h" +#include "amqpcpp/connectionimpl.h" +#include "amqpcpp/deferredconsumerbase.h" /** * Set up namespace diff --git a/src/channelcloseframe.h b/src/channelcloseframe.h index d47d88d..48476af 100644 --- a/src/channelcloseframe.h +++ b/src/channelcloseframe.h @@ -82,7 +82,7 @@ public: * @param failingMethod failing method id if applicable */ ChannelCloseFrame(uint16_t channel, uint16_t code = 0, const std::string& text = "", uint16_t failingClass = 0, uint16_t failingMethod = 0) : - ChannelFrame(channel, (text.length() + 7)), // sizeof code, failingclass, failingmethod (2byte + 2byte + 2byte) + text length + text length byte + ChannelFrame(channel, (uint32_t)(text.length() + 7)), // sizeof code, failingclass, failingmethod (2byte + 2byte + 2byte) + text length + text length byte _code(code), _text(text), _failingClass(failingClass), diff --git a/src/channelimpl.cpp b/src/channelimpl.cpp index 73471ab..9d32874 100644 --- a/src/channelimpl.cpp +++ b/src/channelimpl.cpp @@ -481,7 +481,7 @@ bool ChannelImpl::publish(const std::string &exchange, const std::string &routin uint64_t chunksize = std::min(static_cast(maxpayload), bytesleft); // send out a body frame - if (!send(BodyFrame(_id, data + bytessent, chunksize))) return false; + if (!send(BodyFrame(_id, data + bytessent, (uint32_t)chunksize))) return false; // channel still valid? if (!monitor.valid()) return false; diff --git a/src/connectioncloseframe.h b/src/connectioncloseframe.h index 17378c0..9828bb5 100644 --- a/src/connectioncloseframe.h +++ b/src/connectioncloseframe.h @@ -82,7 +82,7 @@ public: * @param failingMethod id of the failing method if applicable */ ConnectionCloseFrame(uint16_t code, const std::string &text, uint16_t failingClass = 0, uint16_t failingMethod = 0) : - ConnectionFrame(text.length() + 7), // 1 for extra string byte, 2 for each uint16 + ConnectionFrame((uint32_t)(text.length() + 7)), // 1 for extra string byte, 2 for each uint16 _code(code), _text(text), _failingClass(failingClass), diff --git a/src/connectionimpl.cpp b/src/connectionimpl.cpp index f2bd13c..332e106 100644 --- a/src/connectionimpl.cpp +++ b/src/connectionimpl.cpp @@ -125,7 +125,7 @@ uint64_t ConnectionImpl::parse(const Buffer &buffer) try { // try to recognize the frame - ReducedBuffer reduced_buf(buffer, processed); + ReducedBuffer reduced_buf(buffer, (size_t)processed); ReceivedFrame receivedFrame(reduced_buf, _maxFrame); // do we have the full frame? @@ -146,7 +146,7 @@ uint64_t ConnectionImpl::parse(const Buffer &buffer) // have the initial bytes of the header, we already know how much // data we need for the next frame, otherwise we need at least 7 // bytes for processing the header of the next frame - _expected = receivedFrame.header() ? receivedFrame.totalSize() : 7; + _expected = receivedFrame.header() ? (uint32_t)receivedFrame.totalSize() : 7; // we're ready for now return processed; diff --git a/src/connectionopenframe.h b/src/connectionopenframe.h index ca78dd6..c063357 100644 --- a/src/connectionopenframe.h +++ b/src/connectionopenframe.h @@ -60,7 +60,7 @@ public: * @param vhost name of virtual host to open */ ConnectionOpenFrame(const std::string &vhost) : - ConnectionFrame(vhost.length() + 3), // length of vhost + byte to encode this length + deprecated shortstring size + deprecated bool + ConnectionFrame((uint32_t)(vhost.length() + 3)), // length of vhost + byte to encode this length + deprecated shortstring size + deprecated bool _vhost(vhost), _deprecatedCapabilities(""), _deprecatedInsist() diff --git a/src/connectionsecureframe.h b/src/connectionsecureframe.h index f8564ec..8c128d1 100644 --- a/src/connectionsecureframe.h +++ b/src/connectionsecureframe.h @@ -43,7 +43,7 @@ public: * @param challenge the challenge */ ConnectionSecureFrame(const std::string& challenge) : - ConnectionFrame(challenge.length() + 4), // 4 for the length of the challenge (uint32_t) + ConnectionFrame((uint32_t)(challenge.length() + 4)), // 4 for the length of the challenge (uint32_t) _challenge(challenge) {} diff --git a/src/connectionsecureokframe.h b/src/connectionsecureokframe.h index d5c907b..7867804 100644 --- a/src/connectionsecureokframe.h +++ b/src/connectionsecureokframe.h @@ -43,7 +43,7 @@ public: * @param response the challenge response */ ConnectionSecureOKFrame(const std::string& response) : - ConnectionFrame(response.length() + 4), //response length + uint32_t for encoding the length + ConnectionFrame((uint32_t)(response.length() + 4)), //response length + uint32_t for encoding the length _response(response) {} diff --git a/src/connectionstartframe.h b/src/connectionstartframe.h index 6bea81c..272384e 100644 --- a/src/connectionstartframe.h +++ b/src/connectionstartframe.h @@ -81,7 +81,7 @@ public: * @param locales available locales */ ConnectionStartFrame(uint8_t major, uint8_t minor, const Table& properties, const std::string& mechanisms, const std::string& locales) : - ConnectionFrame((properties.size() + mechanisms.length() + locales.length() + 10)), // 4 for each longstring (size-uint32), 2 major/minor + ConnectionFrame((uint32_t)(properties.size() + mechanisms.length() + locales.length() + 10)), // 4 for each longstring (size-uint32), 2 major/minor _major(major), _minor(minor), _properties(properties), diff --git a/src/connectionstartokframe.h b/src/connectionstartokframe.h index 5ca994e..bc46d5e 100644 --- a/src/connectionstartokframe.h +++ b/src/connectionstartokframe.h @@ -82,7 +82,7 @@ public: * @param locale selected locale. */ ConnectionStartOKFrame(const Table& properties, const std::string& mechanism, const std::string& response, const std::string& locale) : - ConnectionFrame((properties.size() + mechanism.length() + response.length() + locale.length() + 6)), // 1 byte extra per shortstring, 4 per longstring + ConnectionFrame((uint32_t)(properties.size() + mechanism.length() + response.length() + locale.length() + 6)), // 1 byte extra per shortstring, 4 per longstring _properties(properties), _mechanism(mechanism), _response(response), diff --git a/src/deferredconsumerbase.cpp b/src/deferredconsumerbase.cpp index 240842b..09ed0e4 100644 --- a/src/deferredconsumerbase.cpp +++ b/src/deferredconsumerbase.cpp @@ -10,7 +10,7 @@ /** * Dependencies */ -#include "../include/deferredconsumerbase.h" +#include "amqpcpp/deferredconsumerbase.h" #include "basicdeliverframe.h" #include "basicgetokframe.h" #include "basicheaderframe.h" diff --git a/src/exchangebindframe.h b/src/exchangebindframe.h index 1d01161..0b6a092 100644 --- a/src/exchangebindframe.h +++ b/src/exchangebindframe.h @@ -95,7 +95,7 @@ public: * @param arguments */ ExchangeBindFrame(uint16_t channel, const std::string &destination, const std::string &source, const std::string &routingKey, bool noWait, const Table &arguments) : - ExchangeFrame(channel, (destination.length() + source.length() + routingKey.length() + arguments.size() + 6)), // 1 for each string, 1 for booleanset, 2 for deprecated field + ExchangeFrame(channel, (uint32_t)(destination.length() + source.length() + routingKey.length() + arguments.size() + 6)), // 1 for each string, 1 for booleanset, 2 for deprecated field _destination(destination), _source(source), _routingKey(routingKey), diff --git a/src/exchangedeclareframe.h b/src/exchangedeclareframe.h index aa940dd..cb45f01 100644 --- a/src/exchangedeclareframe.h +++ b/src/exchangedeclareframe.h @@ -81,7 +81,7 @@ public: * @param arguments additional arguments */ ExchangeDeclareFrame(uint16_t channel, const std::string& name, const std::string& type, bool passive, bool durable, bool noWait, const Table& arguments) : - ExchangeFrame(channel, (name.length() + type.length() + arguments.size() + 5)), // size of name, type and arguments + 1 (all booleans are stored in 1 byte) + 2 (deprecated short) + 2 (string sizes) + ExchangeFrame(channel, (uint32_t)(name.length() + type.length() + arguments.size() + 5)), // size of name, type and arguments + 1 (all booleans are stored in 1 byte) + 2 (deprecated short) + 2 (string sizes) _name(name), _type(type), _bools(passive, durable, false, false, noWait), diff --git a/src/exchangedeleteframe.h b/src/exchangedeleteframe.h index cdd906f..2fc9e23 100644 --- a/src/exchangedeleteframe.h +++ b/src/exchangedeleteframe.h @@ -73,7 +73,7 @@ public: * @param bool noWait Do not wait for a response */ ExchangeDeleteFrame(uint16_t channel, const std::string& name, bool ifUnused = false, bool noWait = false) : - ExchangeFrame(channel, name.length() + 4), // length of the name, 1 byte for encoding this length, 1 for bools, 2 for deprecated short + ExchangeFrame(channel, (uint32_t)(name.length() + 4)), // length of the name, 1 byte for encoding this length, 1 for bools, 2 for deprecated short _name(name), _bools(ifUnused, noWait) {} diff --git a/src/exchangeunbindframe.h b/src/exchangeunbindframe.h index d92d941..bfd3469 100644 --- a/src/exchangeunbindframe.h +++ b/src/exchangeunbindframe.h @@ -95,7 +95,7 @@ public: * @param arguments */ ExchangeUnbindFrame(uint16_t channel, const std::string &destination, const std::string &source, const std::string &routingKey, bool noWait, const Table &arguments) : - ExchangeFrame(channel, (destination.length() + source.length() + routingKey.length() + arguments.size() + 6)), // 1 for each string, 1 for booleanset, 2 for deprecated field + ExchangeFrame(channel, (uint32_t)(destination.length() + source.length() + routingKey.length() + arguments.size() + 6)), // 1 for each string, 1 for booleanset, 2 for deprecated field _destination(destination), _source(source), _routingKey(routingKey), diff --git a/src/extframe.h b/src/extframe.h index d6de772..513c674 100644 --- a/src/extframe.h +++ b/src/extframe.h @@ -19,8 +19,8 @@ /** * Dependencies */ -#include "../include/frame.h" -#include "../include/receivedframe.h" +#include "amqpcpp/frame.h" +#include "amqpcpp/receivedframe.h" /** * Set up namespace diff --git a/src/framecheck.h b/src/framecheck.h index ab8c959..675cebe 100644 --- a/src/framecheck.h +++ b/src/framecheck.h @@ -50,7 +50,7 @@ public: virtual ~FrameCheck() { // update the number of bytes to skip - _frame->_skip += _size; + _frame->_skip += (uint32_t)_size; } }; diff --git a/src/includes.h b/src/includes.h index 2a10061..122f2d0 100644 --- a/src/includes.h +++ b/src/includes.h @@ -9,7 +9,7 @@ // c and c++ dependencies #include -#include +#include // TODO cstring #include #include #include @@ -21,67 +21,69 @@ #include #include #include -#include -#include -#include -#include -#include + +#include // TODO is this needed + #include #include +// TODO make this nice +#ifdef _MSC_VER +//not #if defined(_WIN32) || defined(_WIN64) because we have strncasecmp in mingw +#define strncasecmp _strnicmp +#define strcasecmp _stricmp +#endif + // forward declarations -#include "../include/classes.h" +#include "amqpcpp/classes.h" // utility classes -#include "../include/endian.h" -#include "../include/buffer.h" -#include "../include/bytebuffer.h" -#include "../include/receivedframe.h" -#include "../include/outbuffer.h" -#include "../include/copiedbuffer.h" -#include "../include/watchable.h" -#include "../include/monitor.h" -#include "../include/tcpdefines.h" +#include "amqpcpp/endian.h" +#include "amqpcpp/buffer.h" +#include "amqpcpp/bytebuffer.h" +#include "amqpcpp/receivedframe.h" +#include "amqpcpp/outbuffer.h" +#include "amqpcpp/copiedbuffer.h" +#include "amqpcpp/watchable.h" +#include "amqpcpp/monitor.h" // amqp types -#include "../include/field.h" -#include "../include/numericfield.h" -#include "../include/decimalfield.h" -#include "../include/stringfield.h" -#include "../include/booleanset.h" -#include "../include/fieldproxy.h" -#include "../include/table.h" -#include "../include/array.h" +#include "amqpcpp/field.h" +#include "amqpcpp/numericfield.h" +#include "amqpcpp/decimalfield.h" +#include "amqpcpp/stringfield.h" +#include "amqpcpp/booleanset.h" +#include "amqpcpp/fieldproxy.h" +#include "amqpcpp/table.h" +#include "amqpcpp/array.h" // envelope for publishing and consuming -#include "../include/metadata.h" -#include "../include/envelope.h" -#include "../include/message.h" +#include "amqpcpp/metadata.h" +#include "amqpcpp/envelope.h" +#include "amqpcpp/message.h" // mid level includes -#include "../include/exchangetype.h" -#include "../include/flags.h" -#include "../include/callbacks.h" -#include "../include/deferred.h" -#include "../include/deferredconsumer.h" -#include "../include/deferredqueue.h" -#include "../include/deferreddelete.h" -#include "../include/deferredcancel.h" -#include "../include/deferredget.h" -#include "../include/channelimpl.h" -#include "../include/channel.h" -#include "../include/login.h" -#include "../include/address.h" -#include "../include/connectionhandler.h" -#include "../include/connectionimpl.h" -#include "../include/connection.h" -#include "../include/tcphandler.h" -#include "../include/tcpconnection.h" +#include "amqpcpp/exchangetype.h" +#include "amqpcpp/flags.h" +#include "amqpcpp/callbacks.h" +#include "amqpcpp/deferred.h" +#include "amqpcpp/deferredconsumer.h" +#include "amqpcpp/deferredqueue.h" +#include "amqpcpp/deferreddelete.h" +#include "amqpcpp/deferredcancel.h" +#include "amqpcpp/deferredget.h" +#include "amqpcpp/channelimpl.h" +#include "amqpcpp/channel.h" +#include "amqpcpp/login.h" +#include "amqpcpp/address.h" +#include "amqpcpp/connectionhandler.h" +#include "amqpcpp/connectionimpl.h" +#include "amqpcpp/connection.h" // classes that are very commonly used -#include "../include/exception.h" -#include "../include/protocolexception.h" -#include "../include/frame.h" +#include "amqpcpp/exception.h" +#include "amqpcpp/protocolexception.h" +#include "amqpcpp/frame.h" #include "extframe.h" #include "methodframe.h" #include "headerframe.h" @@ -91,6 +93,5 @@ #include "queueframe.h" #include "basicframe.h" #include "transactionframe.h" -#include "addressinfo.h" diff --git a/src/linux_tcp/CMakeLists.txt b/src/linux_tcp/CMakeLists.txt new file mode 100644 index 0000000..55d0e16 --- /dev/null +++ b/src/linux_tcp/CMakeLists.txt @@ -0,0 +1,12 @@ +add_sources( + addressinfo.h + includes.h + pipe.h + tcpclosed.h + tcpconnected.h + tcpconnection.cpp + tcpinbuffer.h + tcpoutbuffer.h + tcpresolver.h + tcpstate.h +) diff --git a/src/addressinfo.h b/src/linux_tcp/addressinfo.h similarity index 100% rename from src/addressinfo.h rename to src/linux_tcp/addressinfo.h diff --git a/src/linux_tcp/includes.h b/src/linux_tcp/includes.h new file mode 100644 index 0000000..a83e9bd --- /dev/null +++ b/src/linux_tcp/includes.h @@ -0,0 +1,27 @@ +/** + * Includes.h + * + * The includes that are necessary to compile the optional TCP part of the AMQP library + * This file also holds includes that may not be necessary for including the library + * + * @documentation private + */ +// include files from main library +#include "../includes.h" + +// c and c++ dependencies +#include +#include +#include +#include +#include + +// utility classes +#include "amqpcpp/linux_tcp/tcpdefines.h" + +// mid level includes +#include "amqpcpp/linux_tcp/tcphandler.h" +#include "amqpcpp/linux_tcp/tcpconnection.h" + +// classes that are very commonly used +#include "addressinfo.h" diff --git a/src/pipe.h b/src/linux_tcp/pipe.h similarity index 100% rename from src/pipe.h rename to src/linux_tcp/pipe.h diff --git a/src/tcpclosed.h b/src/linux_tcp/tcpclosed.h similarity index 100% rename from src/tcpclosed.h rename to src/linux_tcp/tcpclosed.h diff --git a/src/tcpconnected.h b/src/linux_tcp/tcpconnected.h similarity index 100% rename from src/tcpconnected.h rename to src/linux_tcp/tcpconnected.h diff --git a/src/tcpconnection.cpp b/src/linux_tcp/tcpconnection.cpp similarity index 100% rename from src/tcpconnection.cpp rename to src/linux_tcp/tcpconnection.cpp diff --git a/src/tcpinbuffer.h b/src/linux_tcp/tcpinbuffer.h similarity index 100% rename from src/tcpinbuffer.h rename to src/linux_tcp/tcpinbuffer.h diff --git a/src/tcpoutbuffer.h b/src/linux_tcp/tcpoutbuffer.h similarity index 100% rename from src/tcpoutbuffer.h rename to src/linux_tcp/tcpoutbuffer.h diff --git a/src/tcpresolver.h b/src/linux_tcp/tcpresolver.h similarity index 100% rename from src/tcpresolver.h rename to src/linux_tcp/tcpresolver.h diff --git a/src/tcpstate.h b/src/linux_tcp/tcpstate.h similarity index 100% rename from src/tcpstate.h rename to src/linux_tcp/tcpstate.h diff --git a/src/passthroughbuffer.h b/src/passthroughbuffer.h index 87509f3..4126eca 100644 --- a/src/passthroughbuffer.h +++ b/src/passthroughbuffer.h @@ -17,7 +17,7 @@ */ #include #include -#include "../include/frame.h" +#include "amqpcpp/frame.h" /** * Set up namespace diff --git a/src/queuebindframe.h b/src/queuebindframe.h index bc4d2e9..8418fe1 100644 --- a/src/queuebindframe.h +++ b/src/queuebindframe.h @@ -87,7 +87,7 @@ public: * @param Table arguments additional arguments */ QueueBindFrame(uint16_t channel, const std::string& name, const std::string& exchange, const std::string& routingKey = "", bool noWait = false, const Table& arguments = {}) : - QueueFrame(channel, (name.length() + exchange.length() + routingKey.length() + arguments.size() + 6) ), // 3 extra per string, 1 for bools, 2 for deprecated field + QueueFrame(channel, (uint32_t)(name.length() + exchange.length() + routingKey.length() + arguments.size() + 6) ), // 3 extra per string, 1 for bools, 2 for deprecated field _name(name), _exchange(exchange), _routingKey(routingKey), diff --git a/src/queuedeclareframe.h b/src/queuedeclareframe.h index 475c8b6..627b17c 100644 --- a/src/queuedeclareframe.h +++ b/src/queuedeclareframe.h @@ -80,7 +80,7 @@ public: * @param Table arguments additional arguments, implementation dependent */ QueueDeclareFrame(uint16_t channel, const std::string& name = "", bool passive = false, bool durable = false, bool exclusive = false, bool autoDelete = false, bool noWait = false, const Table& arguments = {}) : - QueueFrame(channel, (name.length() + arguments.size() + 4 ) ), // 1 extra for string size, 1 for bools, 2 for deprecated value + QueueFrame(channel, (uint32_t)(name.length() + arguments.size() + 4 ) ), // 1 extra for string size, 1 for bools, 2 for deprecated value _name(name), _bools(passive, durable, exclusive, autoDelete, noWait), _arguments(arguments) diff --git a/src/queuedeclareokframe.h b/src/queuedeclareokframe.h index 44821a5..99ad1e8 100644 --- a/src/queuedeclareokframe.h +++ b/src/queuedeclareokframe.h @@ -62,7 +62,7 @@ public: * @param failingMethod failing method id if applicable */ QueueDeclareOKFrame(uint16_t channel, const std::string& name, int32_t messageCount, int32_t consumerCount) : - QueueFrame(channel, name.length() + 9), // 4 per int, 1 for string size + QueueFrame(channel, (uint32_t)(name.length() + 9)), // 4 per int, 1 for string size _name(name), _messageCount(messageCount), _consumerCount(consumerCount) diff --git a/src/queuedeleteframe.h b/src/queuedeleteframe.h index dc9a00a..f76e736 100644 --- a/src/queuedeleteframe.h +++ b/src/queuedeleteframe.h @@ -69,7 +69,7 @@ public: * @param noWait do not wait on response */ QueueDeleteFrame(uint16_t channel, const std::string& name, bool ifUnused = false, bool ifEmpty = false, bool noWait = false) : - QueueFrame(channel, name.length() + 4), // 1 for string length, 1 for bools, 2 for deprecated field + QueueFrame(channel, (uint32_t)(name.length() + 4)), // 1 for string length, 1 for bools, 2 for deprecated field _name(name), _bools(ifUnused, ifEmpty, noWait) {} diff --git a/src/queuepurgeframe.h b/src/queuepurgeframe.h index 08cfad5..016cfd0 100644 --- a/src/queuepurgeframe.h +++ b/src/queuepurgeframe.h @@ -65,7 +65,7 @@ public: * @return newly created Queuepurgeframe */ QueuePurgeFrame(uint16_t channel, const std::string& name, bool noWait = false) : - QueueFrame(channel, name.length() + 4), // 1 extra for string length, 1 for bool, 2 for deprecated field + QueueFrame(channel, (uint32_t)(name.length() + 4)), // 1 extra for string length, 1 for bool, 2 for deprecated field _name(name), _noWait(noWait) {} diff --git a/src/queueunbindframe.h b/src/queueunbindframe.h index 8460663..e40b3b9 100644 --- a/src/queueunbindframe.h +++ b/src/queueunbindframe.h @@ -80,7 +80,7 @@ public: * @param arguments additional arguments, implementation dependant. */ QueueUnbindFrame(uint16_t channel, const std::string& name, const std::string& exchange, const std::string& routingKey = "", const Table& arguments = {} ) : - QueueFrame(channel, (name.length() + exchange.length() + routingKey.length() + arguments.size() + 5) ), // 1 per string, 2 for deprecated field + QueueFrame(channel, (uint32_t)(name.length() + exchange.length() + routingKey.length() + arguments.size() + 5) ), // 1 per string, 2 for deprecated field _name(name), _exchange(exchange), _routingKey(routingKey), diff --git a/src/table.cpp b/src/table.cpp index ac2fd45..d4177f4 100644 --- a/src/table.cpp +++ b/src/table.cpp @@ -21,7 +21,7 @@ Table::Table(ReceivedFrame &frame) ShortString name(frame); // subtract number of bytes to read, plus one byte for the decoded type - bytesToRead -= (name.size() + 1); + bytesToRead -= (uint32_t)(name.size() + 1); // get the field Field *field = Field::decode(frame); @@ -31,7 +31,7 @@ Table::Table(ReceivedFrame &frame) _fields[name] = std::shared_ptr(field); // subtract size - bytesToRead -= field->size(); + bytesToRead -= (uint32_t)field->size(); } }