diff --git a/CMakeLists.txt b/CMakeLists.txt new file mode 100644 index 0000000..4670b58 --- /dev/null +++ b/CMakeLists.txt @@ -0,0 +1,33 @@ +cmake_minimum_required(VERSION 2.8) + +project(amqp-cpp) + +# ensure c++11 on all compilers +include(set_cxx_norm.cmake) +set_cxx_norm (${CXX_NORM_CXX11}) + +macro (add_sources) + file (RELATIVE_PATH _relPath "${PROJECT_SOURCE_DIR}" "${CMAKE_CURRENT_SOURCE_DIR}") + foreach (_src ${ARGN}) + if (_relPath) + list (APPEND SRCS "${_relPath}/${_src}") + else() + list (APPEND SRCS "${_src}") + endif() + endforeach() + if (_relPath) + # propagate SRCS to parent directory + set (SRCS ${SRCS} PARENT_SCOPE) + endif() +endmacro() + +add_subdirectory(src) +add_subdirectory(include) +add_subdirectory(amqp_boost_test) + +include_directories(${CMAKE_SOURCE_DIR}/include) +add_library(amqp-cpp STATIC ${SRCS}) +target_include_directories(amqp-cpp SYSTEM PUBLIC ${PROJECT_SOURCE_DIR}) + +set(AMQP-CPP_INCLUDE_PATH ${CMAKE_CURRENT_SOURCE_DIR}) +set(AMQP-CPP_INCLUDE_PATH ${CMAKE_CURRENT_SOURCE_DIR} PARENT_SCOPE) diff --git a/amqp_boost_test/CMakeLists.txt b/amqp_boost_test/CMakeLists.txt new file mode 100644 index 0000000..0669683 --- /dev/null +++ b/amqp_boost_test/CMakeLists.txt @@ -0,0 +1,37 @@ +set(Boost_USE_STATIC_LIBS ON) +set(Boost_USE_MULTITHREADED ON) +set(Boost_USE_STATIC_RUNTIME OFF) +find_package(Boost 1.55 REQUIRED COMPONENTS thread system) + +if(NOT Boost_FOUND) + message( FATAL_ERROR "boost must be installed") +endif() + +include_directories(${Boost_INCLUDE_DIRS}) + +set(SRC main.cpp) + +if("${CMAKE_SYSTEM_NAME}" STREQUAL "Linux") +set(SPECIFIC_HOST_LIBS rt pthread) +else() +set(SPECIFIC_HOST_LIBS) +endif() + +set(LIBS +${Boost_LIBRARIES} +amqp-cpp +) + +include_directories(SYSTEM ${CRYPTLIB_INCLUDE_PATH}) +link_directories(${Boost_LIBRARY_PATH}) + +include_directories(SYSTEM ${AMQP-CPP_INCLUDE_PATH}) + +add_executable(amqp_boost_test ${SRC}) +target_link_libraries(amqp_boost_test ${LIBS} ${SPECIFIC_HOST_LIBS}) + +install(TARGETS amqp_boost_test + RUNTIME DESTINATION bin + LIBRARY DESTINATION lib + ARCHIVE DESTINATION lib +) diff --git a/amqp_boost_test/main.cpp b/amqp_boost_test/main.cpp new file mode 100644 index 0000000..0dd80bc --- /dev/null +++ b/amqp_boost_test/main.cpp @@ -0,0 +1,8 @@ +#include + +int main(int argc, const char* argv[]) +{ + + + return 0; +} \ No newline at end of file diff --git a/amqpcpp.h b/amqpcpp.h index c94a43a..3b72514 100644 --- a/amqpcpp.h +++ b/amqpcpp.h @@ -25,7 +25,9 @@ // base C include files #include #include -#include + +// include compatibilities for apple +#include // forward declarations #include diff --git a/include/CMakeLists.txt b/include/CMakeLists.txt new file mode 100644 index 0000000..db219a8 --- /dev/null +++ b/include/CMakeLists.txt @@ -0,0 +1,37 @@ +add_sources( +array.h +booleanset.h +buffer.h +bytebuffer.h +callbacks.h +channel.h +channelimpl.h +classes.h +compat_endian.h +connection.h +connectionhandler.h +connectionimpl.h +decimalfield.h +deferred.h +deferredcancel.h +deferredconsumer.h +deferreddelete.h +deferredget.h +deferredqueue.h +entityimpl.h +envelope.h +exchangetype.h +field.h +fieldproxy.h +flags.h +login.h +message.h +metadata.h +monitor.h +numericfield.h +outbuffer.h +receivedframe.h +stringfield.h +table.h +watchable.h +) diff --git a/include/apple.h b/include/apple.h new file mode 100644 index 0000000..97c60fc --- /dev/null +++ b/include/apple.h @@ -0,0 +1,31 @@ +#ifndef __AMQP_CPP_COMPAT_ENDIAN_H__ +#define __AMQP_CPP_COMPAT_ENDIAN_H__ + +#if defined(__APPLE__) + +#include + +#include + +#define htobe16(x) OSSwapHostToBigInt16(x) +#define htole16(x) OSSwapHostToLittleInt16(x) +#define be16toh(x) OSSwapBigToHostInt16(x) +#define le16toh(x) OSSwapLittleToHostInt16(x) + +#define htobe32(x) OSSwapHostToBigInt32(x) +#define htole32(x) OSSwapHostToLittleInt32(x) +#define be32toh(x) OSSwapBigToHostInt32(x) +#define le32toh(x) OSSwapLittleToHostInt32(x) + +#define htobe64(x) OSSwapHostToBigInt64(x) +#define htole64(x) OSSwapHostToLittleInt64(x) +#define be64toh(x) OSSwapBigToHostInt64(x) +#define le64toh(x) OSSwapLittleToHostInt64(x) + +#else + +#include + +#endif + +#endif diff --git a/include/bytebuffer.h b/include/bytebuffer.h index dd256ab..75c8776 100644 --- a/include/bytebuffer.h +++ b/include/bytebuffer.h @@ -90,7 +90,6 @@ public: { return memcpy(buffer, _data + pos, size); } - }; /** diff --git a/include/connection.h b/include/connection.h index 99c616d..358396b 100644 --- a/include/connection.h +++ b/include/connection.h @@ -102,7 +102,7 @@ public: { return _implementation.parse(buffer); } - + /** * Close the connection * This will close all channels diff --git a/set_cxx_norm.cmake b/set_cxx_norm.cmake new file mode 100644 index 0000000..da2af82 --- /dev/null +++ b/set_cxx_norm.cmake @@ -0,0 +1,55 @@ +# Version +cmake_minimum_required(VERSION 2.6.3) + +set(CXX_NORM_CXX98 1) # C++98 +set(CXX_NORM_CXX03 2) # C++03 +set(CXX_NORM_CXX11 3) # C++11 + +# - Set the wanted C++ norm +# Adds the good argument to the command line in function of the compiler +# Currently only works with g++ and clang++ +macro(set_cxx_norm NORM) + + # Extract c++ compiler --version output + exec_program( + ${CMAKE_CXX_COMPILER} + ARGS --version + OUTPUT_VARIABLE _compiler_output + ) + # Keep only the first line + string(REGEX REPLACE + "(\n.*$)" + "" + cxx_compiler_version "${_compiler_output}" + ) + # Extract the version number + string(REGEX REPLACE + "([^0-9.])|([0-9.][^0-9.])" + "" + cxx_compiler_version "${cxx_compiler_version}" + ) + + if(CMAKE_COMPILER_IS_GNUCXX) + + if(${NORM} EQUAL ${CXX_NORM_CXX98}) + add_definitions("-std=c++98") + elseif(${NORM} EQUAL ${CXX_NORM_CXX03}) + add_definitions("-std=c++03") + elseif(${NORM} EQUAL ${CXX_NORM_CXX11}) + if(${cxx_compiler_version} VERSION_LESS "4.7.0") + add_definitions("-std=c++0x") + else() + add_definitions("-std=c++11") + endif() + endif() + + elseif(${CMAKE_CXX_COMPILER_ID} STREQUAL "Clang") + + if(${NORM} EQUAL ${CXX_NORM_CXX11}) + add_definitions("-std=c++11") + endif() + + endif() + +endmacro() + diff --git a/src/CMakeLists.txt b/src/CMakeLists.txt new file mode 100644 index 0000000..b9c8707 --- /dev/null +++ b/src/CMakeLists.txt @@ -0,0 +1,92 @@ +add_sources( +array.cpp +basicackframe.h +basiccancelframe.h +basiccancelokframe.h +basicconsumeframe.h +basicconsumeokframe.h +basicdeliverframe.h +basicframe.h +basicgetemptyframe.h +basicgetframe.h +basicgetokframe.h +basicheaderframe.h +basicnackframe.h +basicpublishframe.h +basicqosframe.h +basicqosokframe.h +basicrecoverasyncframe.h +basicrecoverframe.h +basicrecoverokframe.h +basicrejectframe.h +basicreturnframe.h +bodyframe.h +channelcloseframe.h +channelcloseokframe.h +channelflowframe.h +channelflowokframe.h +channelframe.h +channelimpl.cpp +channelopenframe.h +channelopenokframe.h +connectioncloseframe.h +connectioncloseokframe.h +connectionframe.h +connectionimpl.cpp +connectionopenframe.h +connectionopenokframe.h +connectionsecureframe.h +connectionsecureokframe.h +connectionstartframe.h +connectionstartokframe.h +connectiontuneframe.h +connectiontuneokframe.h +consumedmessage.h +deferredcancel.cpp +deferredconsumer.cpp +deferredget.cpp +exception.h +exchangebindframe.h +exchangebindokframe.h +exchangedeclareframe.h +exchangedeclareokframe.h +exchangedeleteframe.h +exchangedeleteokframe.h +exchangeframe.h +exchangeunbindframe.h +exchangeunbindokframe.h +extframe.h +field.cpp +flags.cpp +frame.h +framecheck.h +headerframe.h +heartbeatframe.h +includes.h +messageimpl.h +methodframe.h +protocolexception.h +protocolheaderframe.h +queuebindframe.h +queuebindokframe.h +queuedeclareframe.h +queuedeclareokframe.h +queuedeleteframe.h +queuedeleteokframe.h +queueframe.h +queuepurgeframe.h +queuepurgeokframe.h +queueunbindframe.h +queueunbindokframe.h +receivedframe.cpp +returnedmessage.h +table.cpp +transactioncommitframe.h +transactioncommitokframe.h +transactionframe.h +transactionrollbackframe.h +transactionrollbackokframe.h +transactionselectframe.h +transactionselectokframe.h +watchable.cpp +) diff --git a/src/channelimpl.cpp b/src/channelimpl.cpp index 2664216..724fb58 100644 --- a/src/channelimpl.cpp +++ b/src/channelimpl.cpp @@ -293,7 +293,7 @@ DeferredQueue &ChannelImpl::declareQueue(const std::string &name, int flags, con QueueDeclareFrame frame(_id, name, flags & passive, flags & durable, flags & exclusive, flags & autodelete, false, arguments); // send the queuedeclareframe - auto *result = new DeferredQueue(send(frame)); + auto *result = new DeferredQueue(!send(frame)); // add the deferred result push(result); @@ -392,7 +392,7 @@ DeferredDelete &ChannelImpl::removeQueue(const std::string &name, int flags) QueueDeleteFrame frame(_id, name, flags & ifunused, flags & ifempty, false); // send the frame, and create deferred object - auto *deferred = new DeferredDelete(send(frame)); + auto *deferred = new DeferredDelete(!send(frame)); // push to list push(deferred); @@ -501,7 +501,7 @@ DeferredConsumer& ChannelImpl::consume(const std::string &queue, const std::stri BasicConsumeFrame frame(_id, queue, tag, flags & nolocal, flags & noack, flags & exclusive, false, arguments); // send the frame, and create deferred object - auto *deferred = new DeferredConsumer(this, send(frame)); + auto *deferred = new DeferredConsumer(this, !send(frame)); // push to list push(deferred); @@ -533,7 +533,54 @@ DeferredCancel &ChannelImpl::cancel(const std::string &tag) BasicCancelFrame frame(_id, tag, false); // send the frame, and create deferred object - auto *deferred = new DeferredCancel(this, send(frame)); + auto *deferred = new DeferredCancel(this, !send(frame)); + + // push to list + push(deferred); + + // done + return *deferred; +} + +/** + * Retrieve a single message from RabbitMQ + * + * When you call this method, you can get one single message from the queue (or none + * at all if the queue is empty). The deferred object that is returned, should be used + * to install a onEmpty() and onSuccess() callback function that will be called + * when the message is consumed and/or when the message could not be consumed. + * + * The following flags are supported: + * + * - noack if set, consumed messages do not have to be acked, this happens automatically + * + * @param queue name of the queue to consume from + * @param flags optional flags + * + * The object returns a deferred handler. Callbacks can be installed + * using onSuccess(), onEmpty(), onError() and onFinalize() methods. + * + * The onSuccess() callback has the following signature: + * + * void myCallback(const Message &message, uint64_t deliveryTag, bool redelivered); + * + * For example: channel.get("myqueue").onSuccess([](const Message &message, uint64_t deliveryTag, bool redelivered) { + * + * std::cout << "Message fetched" << std::endl; + * + * }).onEmpty([]() { + * + * std::cout << "Queue is empty" << std::endl; + * + * }); + */ +DeferredGet &ChannelImpl::get(const std::string &queue, int flags) +{ + // the get frame to send + BasicGetFrame frame(_id, queue, flags & noack); + + // send the frame, and create deferred object + auto *deferred = new DeferredGet(this, send(frame)); // push to list push(deferred);