From 7c0642f30d318b89a2551205ef9b08512d7a131e Mon Sep 17 00:00:00 2001 From: Richard Hodges Date: Fri, 11 Jul 2014 18:17:03 +0100 Subject: [PATCH 01/15] build - added CMakeLists.txt to allow integration into a CMAKE build tree --- CMakeLists.txt | 28 +++++++++++++ amqpcpp.h | 2 +- include/CMakeLists.txt | 28 +++++++++++++ include/compat_endian.h | 31 ++++++++++++++ set_cxx_norm.cmake | 55 +++++++++++++++++++++++++ src/CMakeLists.txt | 90 +++++++++++++++++++++++++++++++++++++++++ 6 files changed, 233 insertions(+), 1 deletion(-) create mode 100644 CMakeLists.txt create mode 100644 include/CMakeLists.txt create mode 100644 include/compat_endian.h create mode 100644 set_cxx_norm.cmake create mode 100644 src/CMakeLists.txt diff --git a/CMakeLists.txt b/CMakeLists.txt new file mode 100644 index 0000000..92a9595 --- /dev/null +++ b/CMakeLists.txt @@ -0,0 +1,28 @@ +cmake_minimum_required(VERSION 2.8) + +# ensure c++11 on all compilers +include(set_cxx_norm.cmake) +set_cxx_norm (${CXX_NORM_CXX11}) + +macro (add_sources) + file (RELATIVE_PATH _relPath "${CMAKE_SOURCE_DIR}" "${CMAKE_CURRENT_SOURCE_DIR}") + foreach (_src ${ARGN}) + if (_relPath) + list (APPEND SRCS "${_relPath}/${_src}") + else() + list (APPEND SRCS "${_src}") + endif() + endforeach() + if (_relPath) + # propagate SRCS to parent directory + set (SRCS ${SRCS} PARENT_SCOPE) + endif() +endmacro() + +add_subdirectory(src) +add_subdirectory(include) + +include_directories(${CMAKE_SOURCE_DIR}/include) +add_library(amqp-cpp STATIC ${SRCS}) +target_include_directories(amqp-cpp SYSTEM PUBLIC ${CMAKE_SOURCE_DIR}) + diff --git a/amqpcpp.h b/amqpcpp.h index 0708362..0ca4c9d 100644 --- a/amqpcpp.h +++ b/amqpcpp.h @@ -20,7 +20,7 @@ // base C include files #include #include -#include +#include // forward declarations #include diff --git a/include/CMakeLists.txt b/include/CMakeLists.txt new file mode 100644 index 0000000..a5aaa3b --- /dev/null +++ b/include/CMakeLists.txt @@ -0,0 +1,28 @@ +add_sources( +array.h +booleanset.h +channel.h +channelhandler.h +channelimpl.h +classes.h +compat_endian.h +connection.h +connectionhandler.h +connectionimpl.h +decimalfield.h +entityimpl.h +envelope.h +exchangetype.h +field.h +fieldproxy.h +flags.h +login.h +message.h +metadata.h +numericfield.h +outbuffer.h +receivedframe.h +stringfield.h +table.h +watchable.h +) diff --git a/include/compat_endian.h b/include/compat_endian.h new file mode 100644 index 0000000..97c60fc --- /dev/null +++ b/include/compat_endian.h @@ -0,0 +1,31 @@ +#ifndef __AMQP_CPP_COMPAT_ENDIAN_H__ +#define __AMQP_CPP_COMPAT_ENDIAN_H__ + +#if defined(__APPLE__) + +#include + +#include + +#define htobe16(x) OSSwapHostToBigInt16(x) +#define htole16(x) OSSwapHostToLittleInt16(x) +#define be16toh(x) OSSwapBigToHostInt16(x) +#define le16toh(x) OSSwapLittleToHostInt16(x) + +#define htobe32(x) OSSwapHostToBigInt32(x) +#define htole32(x) OSSwapHostToLittleInt32(x) +#define be32toh(x) OSSwapBigToHostInt32(x) +#define le32toh(x) OSSwapLittleToHostInt32(x) + +#define htobe64(x) OSSwapHostToBigInt64(x) +#define htole64(x) OSSwapHostToLittleInt64(x) +#define be64toh(x) OSSwapBigToHostInt64(x) +#define le64toh(x) OSSwapLittleToHostInt64(x) + +#else + +#include + +#endif + +#endif diff --git a/set_cxx_norm.cmake b/set_cxx_norm.cmake new file mode 100644 index 0000000..1d73651 --- /dev/null +++ b/set_cxx_norm.cmake @@ -0,0 +1,55 @@ +# Version +cmake_minimum_required(VERSION 2.6.3) + +set(CXX_NORM_CXX98 1) # C++98 +set(CXX_NORM_CXX03 2) # C++03 +set(CXX_NORM_CXX11 3) # C++11 + +# - Set the wanted C++ norm +# Adds the good argument to the command line in function of the compiler +# Currently only works with g++ and clang++ +macro(set_cxx_norm NORM) + + # Extract c++ compiler --version output + exec_program( + ${CMAKE_CXX_COMPILER} + ARGS --version + OUTPUT_VARIABLE _compiler_output + ) + # Keep only the first line + string(REGEX REPLACE + "(\n.*$)" + "" + cxx_compiler_version "${_compiler_output}" + ) + # Extract the version number + string(REGEX REPLACE + "([^0-9.])|([0-9.][^0-9.])" + "" + cxx_compiler_version "${cxx_compiler_version}" + ) + + if(CMAKE_COMPILER_IS_GNUCXX) + + if(${NORM} EQUAL ${CXX_NORM_CXX98}) + add_definitions("-std=c++98") + elseif(${NORM} EQUAL ${CXX_NORM_CXX03}) + add_definitions("-std=c++03") + elseif(${NORM} EQUAL ${CXX_NORM_CXX11}) + if(${cxx_compiler_version} VERSION_LESS "4.7.0") + add_definitions("-std=c++0x") + else() + add_definitions("-std=c++11") + endif() + endif() + + elseif(${CMAKE_CXX_COMPILER_ID} STREQUAL "Clang") + + if(${NORM} EQUAL ${CXX_NORM_CXX11}) + add_definitions("-std=c++0x") + endif() + + endif() + +endmacro() + diff --git a/src/CMakeLists.txt b/src/CMakeLists.txt new file mode 100644 index 0000000..ca51b25 --- /dev/null +++ b/src/CMakeLists.txt @@ -0,0 +1,90 @@ +add_sources( +array.cpp +basicackframe.h +basiccancelframe.h +basiccancelokframe.h +basicconsumeframe.h +basicconsumeokframe.h +basicdeliverframe.h +basicframe.h +basicgetemptyframe.h +basicgetframe.h +basicgetokframe.h +basicheaderframe.h +basicnackframe.h +basicpublishframe.h +basicqosframe.h +basicqosokframe.h +basicrecoverasyncframe.h +basicrecoverframe.h +basicrecoverokframe.h +basicrejectframe.h +basicreturnframe.h +bodyframe.h +channelcloseframe.h +channelcloseokframe.h +channelflowframe.h +channelflowokframe.h +channelframe.h +channelimpl.cpp +channelopenframe.h +channelopenokframe.h +connectioncloseframe.h +connectioncloseokframe.h +connectionframe.h +connectionimpl.cpp +connectionopenframe.h +connectionopenokframe.h +connectionsecureframe.h +connectionsecureokframe.h +connectionstartframe.h +connectionstartokframe.h +connectiontuneframe.h +connectiontuneokframe.h +consumedmessage.h +exception.h +exchangebindframe.h +exchangebindokframe.h +exchangedeclareframe.h +exchangedeclareokframe.h +exchangedeleteframe.h +exchangedeleteokframe.h +exchangeframe.h +exchangeunbindframe.h +exchangeunbindokframe.h +extframe.h +field.cpp +flags.cpp +frame.h +framecheck.h +headerframe.h +heartbeatframe.h +includes.h +messageimpl.h +methodframe.h +monitor.h +protocolexception.h +protocolheaderframe.h +queuebindframe.h +queuebindokframe.h +queuedeclareframe.h +queuedeclareokframe.h +queuedeleteframe.h +queuedeleteokframe.h +queueframe.h +queuepurgeframe.h +queuepurgeokframe.h +queueunbindframe.h +queueunbindokframe.h +receivedframe.cpp +returnedmessage.h +table.cpp +transactioncommitframe.h +transactioncommitokframe.h +transactionframe.h +transactionrollbackframe.h +transactionrollbackokframe.h +transactionselectframe.h +transactionselectokframe.h +watchable.cpp +) From 43c6e163b1afdd740fc5589137dd7ddefd7f027e Mon Sep 17 00:00:00 2001 From: Richard Hodges Date: Fri, 11 Jul 2014 18:38:05 +0100 Subject: [PATCH 02/15] build - make source files relative to project dir rather than CMAKE_SOURCE_DIR --- CMakeLists.txt | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/CMakeLists.txt b/CMakeLists.txt index 92a9595..d90f0f7 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -5,7 +5,7 @@ include(set_cxx_norm.cmake) set_cxx_norm (${CXX_NORM_CXX11}) macro (add_sources) - file (RELATIVE_PATH _relPath "${CMAKE_SOURCE_DIR}" "${CMAKE_CURRENT_SOURCE_DIR}") + file (RELATIVE_PATH _relPath "${PROJECT_SOURCE_DIR}" "${CMAKE_CURRENT_SOURCE_DIR}") foreach (_src ${ARGN}) if (_relPath) list (APPEND SRCS "${_relPath}/${_src}") From f88fa4b95504a546bd8601f3ea0f7b0be12cc5d1 Mon Sep 17 00:00:00 2001 From: Richard Hodges Date: Fri, 11 Jul 2014 18:45:02 +0100 Subject: [PATCH 03/15] build - add amqp-cpp project declaration and make include directories relative to it --- CMakeLists.txt | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/CMakeLists.txt b/CMakeLists.txt index d90f0f7..20ec6c3 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -1,5 +1,7 @@ cmake_minimum_required(VERSION 2.8) +project(amqp-cpp) + # ensure c++11 on all compilers include(set_cxx_norm.cmake) set_cxx_norm (${CXX_NORM_CXX11}) @@ -24,5 +26,5 @@ add_subdirectory(include) include_directories(${CMAKE_SOURCE_DIR}/include) add_library(amqp-cpp STATIC ${SRCS}) -target_include_directories(amqp-cpp SYSTEM PUBLIC ${CMAKE_SOURCE_DIR}) +target_include_directories(amqp-cpp SYSTEM PUBLIC ${PROJECT_SOURCE_DIR}) From b9ca150dba27a4c9bbf380efc494be2a44939e83 Mon Sep 17 00:00:00 2001 From: Richard Hodges Date: Mon, 28 Jul 2014 15:16:32 +0100 Subject: [PATCH 04/15] merged from latest upstream --- CMakeLists.txt | 2 ++ include/CMakeLists.txt | 8 +++++++- src/CMakeLists.txt | 3 ++- 3 files changed, 11 insertions(+), 2 deletions(-) diff --git a/CMakeLists.txt b/CMakeLists.txt index 20ec6c3..da1c969 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -28,3 +28,5 @@ include_directories(${CMAKE_SOURCE_DIR}/include) add_library(amqp-cpp STATIC ${SRCS}) target_include_directories(amqp-cpp SYSTEM PUBLIC ${PROJECT_SOURCE_DIR}) +set(AMQP-CPP_INCLUDE_PATH ${CMAKE_CURRENT_SOURCE_DIR}) +set(AMQP-CPP_INCLUDE_PATH ${CMAKE_CURRENT_SOURCE_DIR} PARENT_SCOPE) diff --git a/include/CMakeLists.txt b/include/CMakeLists.txt index a5aaa3b..1e063b2 100644 --- a/include/CMakeLists.txt +++ b/include/CMakeLists.txt @@ -1,8 +1,8 @@ add_sources( array.h booleanset.h +callbacks.h channel.h -channelhandler.h channelimpl.h classes.h compat_endian.h @@ -10,6 +10,11 @@ connection.h connectionhandler.h connectionimpl.h decimalfield.h +deferred.h +deferredcancel.h +deferredconsumer.h +deferreddelete.h +deferredqueue.h entityimpl.h envelope.h exchangetype.h @@ -19,6 +24,7 @@ flags.h login.h message.h metadata.h +monitor.h numericfield.h outbuffer.h receivedframe.h diff --git a/src/CMakeLists.txt b/src/CMakeLists.txt index ca51b25..798a7fa 100644 --- a/src/CMakeLists.txt +++ b/src/CMakeLists.txt @@ -42,6 +42,8 @@ connectionstartokframe.h connectiontuneframe.h connectiontuneokframe.h consumedmessage.h +deferredcancel.cpp +deferredconsumer.cpp exception.h exchangebindframe.h exchangebindokframe.h @@ -62,7 +64,6 @@ heartbeatframe.h includes.h messageimpl.h methodframe.h -monitor.h protocolexception.h protocolheaderframe.h queuebindframe.h From d2a97c1dd39ffb4cc8340f5b0e26a92dfca6dfed Mon Sep 17 00:00:00 2001 From: Richard Hodges Date: Wed, 30 Jul 2014 09:39:20 +0100 Subject: [PATCH 05/15] bugfix - fixes bug where channel declaration deferred object is put in fail state after successful send of data refs #12 --- src/channelimpl.cpp | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/channelimpl.cpp b/src/channelimpl.cpp index 0f6171d..18ceac3 100644 --- a/src/channelimpl.cpp +++ b/src/channelimpl.cpp @@ -117,7 +117,7 @@ Deferred &ChannelImpl::push(Deferred *deferred) Deferred &ChannelImpl::push(const Frame &frame) { // send the frame, and push the result - return push(new Deferred(send(frame))); + return push(new Deferred(!send(frame))); } /** From d4b10cbf35bd65af0c9fe2bd42728ea779e9ba36 Mon Sep 17 00:00:00 2001 From: Richard Hodges Date: Wed, 30 Jul 2014 15:29:05 +0100 Subject: [PATCH 06/15] bug - negate the result of send() before using the result in a DeferredResult constructor --- src/channelimpl.cpp | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/src/channelimpl.cpp b/src/channelimpl.cpp index 18ceac3..860a15d 100644 --- a/src/channelimpl.cpp +++ b/src/channelimpl.cpp @@ -290,7 +290,7 @@ DeferredQueue &ChannelImpl::declareQueue(const std::string &name, int flags, con QueueDeclareFrame frame(_id, name, flags & passive, flags & durable, flags & exclusive, flags & autodelete, false, arguments); // send the queuedeclareframe - auto *result = new DeferredQueue(send(frame)); + auto *result = new DeferredQueue(!send(frame)); // add the deferred result push(result); @@ -389,7 +389,7 @@ DeferredDelete &ChannelImpl::removeQueue(const std::string &name, int flags) QueueDeleteFrame frame(_id, name, flags & ifunused, flags & ifempty, false); // send the frame, and create deferred object - auto *deferred = new DeferredDelete(send(frame)); + auto *deferred = new DeferredDelete(!send(frame)); // push to list push(deferred); @@ -495,7 +495,7 @@ DeferredConsumer& ChannelImpl::consume(const std::string &queue, const std::stri BasicConsumeFrame frame(_id, queue, tag, flags & nolocal, flags & noack, flags & exclusive, false, arguments); // send the frame, and create deferred object - auto *deferred = new DeferredConsumer(this, send(frame)); + auto *deferred = new DeferredConsumer(this, !send(frame)); // push to list push(deferred); @@ -527,7 +527,7 @@ DeferredCancel &ChannelImpl::cancel(const std::string &tag) BasicCancelFrame frame(_id, tag, false); // send the frame, and create deferred object - auto *deferred = new DeferredCancel(this, send(frame)); + auto *deferred = new DeferredCancel(this, !send(frame)); // push to list push(deferred); From ba3b8ecf0fc5a870c7322b64e4bfdefd4caae495 Mon Sep 17 00:00:00 2001 From: Richard Hodges Date: Sun, 3 Aug 2014 20:22:42 +0100 Subject: [PATCH 07/15] build - added dependencies to library target having figured out that's how it should be done :) --- set_cxx_norm.cmake | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/set_cxx_norm.cmake b/set_cxx_norm.cmake index 1d73651..da2af82 100644 --- a/set_cxx_norm.cmake +++ b/set_cxx_norm.cmake @@ -46,7 +46,7 @@ macro(set_cxx_norm NORM) elseif(${CMAKE_CXX_COMPILER_ID} STREQUAL "Clang") if(${NORM} EQUAL ${CXX_NORM_CXX11}) - add_definitions("-std=c++0x") + add_definitions("-std=c++11") endif() endif() From f905c9db49900ec4b4d43fcdbb5fb9956a7ba123 Mon Sep 17 00:00:00 2001 From: Emiel Bruijntjes Date: Thu, 31 Jul 2014 10:10:15 +0200 Subject: [PATCH 08/15] Quality of service now supports "global" parameter (default is still false) --- include/channel.h | 5 +++-- include/channelimpl.h | 5 ++++- src/basicqosframe.h | 2 +- src/channelimpl.cpp | 7 +++++-- 4 files changed, 13 insertions(+), 6 deletions(-) diff --git a/include/channel.h b/include/channel.h index e722ebb..3cfc989 100644 --- a/include/channel.h +++ b/include/channel.h @@ -331,11 +331,12 @@ public: * the prefetchCount * * @param prefetchCount maximum number of messages to prefetch + * @param global share counter between all consumers on the same channel * @return bool whether the Qos frame is sent. */ - Deferred &setQos(uint16_t prefetchCount) + Deferred &setQos(uint16_t prefetchCount, bool global = false) { - return _implementation.setQos(prefetchCount); + return _implementation.setQos(prefetchCount, global); } /** diff --git a/include/channelimpl.h b/include/channelimpl.h index 062c160..0812a9c 100644 --- a/include/channelimpl.h +++ b/include/channelimpl.h @@ -344,8 +344,11 @@ public: * * This function returns a deferred handler. Callbacks can be installed * using onSuccess(), onError() and onFinalize() methods. + * + * @param count number of messages to pre-fetch + * @param global share count between all consumers on the same channel */ - Deferred &setQos(uint16_t prefetchCount); + Deferred &setQos(uint16_t prefetchCount, bool global = false); /** * Tell the RabbitMQ server that we're ready to consume messages diff --git a/src/basicqosframe.h b/src/basicqosframe.h index 8b9c924..79e269d 100644 --- a/src/basicqosframe.h +++ b/src/basicqosframe.h @@ -55,7 +55,7 @@ public: * * @param channel channel we're working on * @param prefetchCount specifies a prefetch window in terms of whole messages - * @param global apply QoS settings to entire connection + * @param global share prefetch count with all consumers on the same channel * @default false */ BasicQosFrame(uint16_t channel, int16_t prefetchCount = 0, bool global = false) : diff --git a/src/channelimpl.cpp b/src/channelimpl.cpp index 860a15d..5102090 100644 --- a/src/channelimpl.cpp +++ b/src/channelimpl.cpp @@ -462,11 +462,14 @@ bool ChannelImpl::publish(const std::string &exchange, const std::string &routin * * This function returns a deferred handler. Callbacks can be installed * using onSuccess(), onError() and onFinalize() methods. + * + * @param prefetchCount number of messages to fetch + * @param global share counter between all consumers on the same channel */ -Deferred &ChannelImpl::setQos(uint16_t prefetchCount) +Deferred &ChannelImpl::setQos(uint16_t prefetchCount, bool global) { // send a qos frame - return push(BasicQosFrame(_id, prefetchCount, false)); + return push(BasicQosFrame(_id, prefetchCount, global)); } /** From b4270f39bc03bdc2f1b3782f386e59a34b1c7165 Mon Sep 17 00:00:00 2001 From: Emiel Bruijntjes Date: Thu, 31 Jul 2014 12:58:13 +0200 Subject: [PATCH 09/15] added Channel::get() that allows one to retrieve a single message from the channel --- amqpcpp.h | 1 + include/callbacks.h | 1 + include/channel.h | 38 +++++++++++- include/channelimpl.h | 39 +++++++++++- include/classes.h | 1 + include/deferred.h | 4 +- include/deferredget.h | 130 +++++++++++++++++++++++++++++++++++++++ src/basicgetemptyframe.h | 21 +++++++ src/basicgetokframe.h | 27 ++++++++ src/channelimpl.cpp | 70 ++++++++++++++++++++- src/consumedmessage.h | 10 +++ src/deferredget.cpp | 64 +++++++++++++++++++ 12 files changed, 397 insertions(+), 9 deletions(-) create mode 100644 include/deferredget.h create mode 100644 src/deferredget.cpp diff --git a/amqpcpp.h b/amqpcpp.h index 328cf08..b63e7fe 100644 --- a/amqpcpp.h +++ b/amqpcpp.h @@ -60,6 +60,7 @@ #include #include #include +#include #include #include #include diff --git a/include/callbacks.h b/include/callbacks.h index a974ef0..c611f89 100644 --- a/include/callbacks.h +++ b/include/callbacks.h @@ -20,6 +20,7 @@ namespace AMQP { using SuccessCallback = std::function; using ErrorCallback = std::function; using FinalizeCallback = std::function; +using EmptyCallback = std::function; using MessageCallback = std::function; using QueueCallback = std::function; using DeleteCallback = std::function; diff --git a/include/channel.h b/include/channel.h index 3cfc989..e4da8bb 100644 --- a/include/channel.h +++ b/include/channel.h @@ -391,9 +391,9 @@ public: * * The onSuccess() callback that you can install should have the following signature: * - * void myCallback(AMQP::Channel *channel, const std::string& tag); + * void myCallback(const std::string& tag); * - * For example: channel.cancel("myqueue").onSuccess([](AMQP::Channel *channel, const std::string& tag) { + * For example: channel.cancel("myqueue").onSuccess([](const std::string& tag) { * * std::cout << "Stopped consuming under tag " << tag << std::endl; * @@ -401,6 +401,40 @@ public: */ DeferredCancel &cancel(const std::string &tag) { return _implementation.cancel(tag); } + /** + * Retrieve a single message from RabbitMQ + * + * When you call this method, you can get one single message from the queue (or none + * at all if the queue is empty). The deferred object that is returned, should be used + * to install a onEmpty() and onSuccess() callback function that will be called + * when the message is consumed and/or when the message could not be consumed. + * + * The following flags are supported: + * + * - noack if set, consumed messages do not have to be acked, this happens automatically + * + * @param queue name of the queue to consume from + * @param flags optional flags + * + * The object returns a deferred handler. Callbacks can be installed + * using onSuccess(), onEmpty(), onError() and onFinalize() methods. + * + * The onSuccess() callback has the following signature: + * + * void myCallback(const Message &message, uint64_t deliveryTag, bool redelivered); + * + * For example: channel.get("myqueue").onSuccess([](const Message &message, uint64_t deliveryTag, bool redelivered) { + * + * std::cout << "Message fetched" << std::endl; + * + * }).onEmpty([]() { + * + * std::cout << "Queue is empty" << std::endl; + * + * }); + */ + DeferredGet &get(const std::string &queue, int flags = 0) { return _implementation.get(queue, flags); } + /** * Acknoldge a received message * diff --git a/include/channelimpl.h b/include/channelimpl.h index 0812a9c..10a31dd 100644 --- a/include/channelimpl.h +++ b/include/channelimpl.h @@ -381,9 +381,9 @@ public: * * The onSuccess() callback that you can install should have the following signature: * - * void myCallback(AMQP::Channel *channel, const std::string& tag); + * void myCallback(const std::string& tag); * - * For example: channel.declareQueue("myqueue").onSuccess([](AMQP::Channel *channel, const std::string& tag) { + * For example: channel.declareQueue("myqueue").onSuccess([](const std::string& tag) { * * std::cout << "Started consuming under tag " << tag << std::endl; * @@ -391,6 +391,40 @@ public: */ DeferredCancel &cancel(const std::string &tag); + /** + * Retrieve a single message from RabbitMQ + * + * When you call this method, you can get one single message from the queue (or none + * at all if the queue is empty). The deferred object that is returned, should be used + * to install a onEmpty() and onSuccess() callback function that will be called + * when the message is consumed and/or when the message could not be consumed. + * + * The following flags are supported: + * + * - noack if set, consumed messages do not have to be acked, this happens automatically + * + * @param queue name of the queue to consume from + * @param flags optional flags + * + * The object returns a deferred handler. Callbacks can be installed + * using onSuccess(), onEmpty(), onError() and onFinalize() methods. + * + * The onSuccess() callback has the following signature: + * + * void myCallback(const Message &message, uint64_t deliveryTag, bool redelivered); + * + * For example: channel.get("myqueue").onSuccess([](const Message &message, uint64_t deliveryTag, bool redelivered) { + * + * std::cout << "Message fetched" << std::endl; + * + * }).onEmpty([]() { + * + * std::cout << "Queue is empty" << std::endl; + * + * }); + */ + DeferredGet &get(const std::string &queue, int flags = 0); + /** * Acknowledge a message * @param deliveryTag the delivery tag @@ -596,6 +630,7 @@ public: * @return ConsumedMessage */ ConsumedMessage *message(const BasicDeliverFrame &frame); + ConsumedMessage *message(const BasicGetOKFrame &frame); /** * Retrieve the current incoming message diff --git a/include/classes.h b/include/classes.h index b4a5500..ec9e38e 100644 --- a/include/classes.h +++ b/include/classes.h @@ -16,6 +16,7 @@ namespace AMQP { */ class Array; class BasicDeliverFrame; +class BasicGetOKFrame; class BasicHeaderFrame; class BasicReturnFrame; class BodyFrame; diff --git a/include/deferred.h b/include/deferred.h index 01a3fe8..3d950cf 100644 --- a/include/deferred.h +++ b/include/deferred.h @@ -67,7 +67,7 @@ protected: * Indicate success * @return Deferred Next deferred result */ - Deferred *reportSuccess() const + virtual Deferred *reportSuccess() const { // execute callbacks if registered if (_successCallback) _successCallback(); @@ -111,7 +111,7 @@ protected: // this is the same as a regular success message return reportSuccess(); } - + /** * Indicate failure * @param error Description of the error that occured diff --git a/include/deferredget.h b/include/deferredget.h new file mode 100644 index 0000000..6642c35 --- /dev/null +++ b/include/deferredget.h @@ -0,0 +1,130 @@ +/** + * DeferredGet.h + * + * @author Emiel Bruijntjes + * @copyright 2014 Copernica BV + */ + +/** + * Set up namespace + */ +namespace AMQP { + +/** + * Class definition + */ +class DeferredGet : public Deferred +{ +private: + /** + * Pointer to the channel + * @var ChannelImpl + */ + ChannelImpl *_channel; + + /** + * Callback for incoming messages + * @var MessageCallback + */ + MessageCallback _messageCallback; + + /** + * Callback in case the queue is empty + * @var EmptyCallback + */ + EmptyCallback _emptyCallback; + + /** + * Report success when a message is indeed expected + * @param count number of messages in the queue + * @return Deferred + */ + virtual Deferred *reportSuccess(uint32_t messagecount) const override; + + /** + * Report success when queue was empty + * @return Deferred + */ + virtual Deferred *reportSuccess() const override; + + /** + * The channel implementation may call our + * private members and construct us + */ + friend class ChannelImpl; + friend class ConsumedMessage; + + +protected: + /** + * Protected constructor that can only be called + * from within the channel implementation + * + * @param channel the channel implementation + * @param failed are we already failed? + */ + DeferredGet(ChannelImpl *channel, bool failed = false) : + Deferred(failed), _channel(channel) {} + +public: + /** + * Register a function to be called when a message arrives + * This fuction is also available as onReceived() and onMessage() because I always forget which name I gave to it + * @param callback + */ + DeferredGet &onSuccess(const MessageCallback &callback) + { + // store the callback + _messageCallback = callback; + + // allow chaining + return *this; + } + + /** + * Register a function to be called when a message arrives + * This fuction is also available as onSuccess() and onMessage() because I always forget which name I gave to it + * @param callback the callback to execute + */ + DeferredGet &onReceived(const MessageCallback &callback) + { + // store callback + _messageCallback = callback; + + // allow chaining + return *this; + } + + /** + * Register a function to be called when a message arrives + * This fuction is also available as onSuccess() and onReceived() because I always forget which name I gave to it + * @param callback the callback to execute + */ + DeferredGet &onMessage(const MessageCallback &callback) + { + // store callback + _messageCallback = callback; + + // allow chaining + return *this; + } + + /** + * Register a function to be called if no message could be fetched + * @param callback the callback to execute + */ + DeferredGet &onEmpty(const EmptyCallback &callback) + { + // store callback + _emptyCallback = callback; + + // allow chaining + return *this; + } +}; + +/** + * End of namespace + */ +} + diff --git a/src/basicgetemptyframe.h b/src/basicgetemptyframe.h index d74af57..6ba5ace 100644 --- a/src/basicgetemptyframe.h +++ b/src/basicgetemptyframe.h @@ -69,6 +69,27 @@ public: { return 72; } + + /** + * Process the frame + * @param connection The connection over which it was received + * @return bool Was it succesfully processed? + */ + virtual bool process(ConnectionImpl *connection) override + { + // we need the appropriate channel + ChannelImpl *channel = connection->channel(this->channel()); + + // channel does not exist + if (!channel) return false; + + // report + if (channel->reportSuccess()) channel->synchronized(); + + // done + return true; + } + }; /** diff --git a/src/basicgetokframe.h b/src/basicgetokframe.h index 5abc6f7..6945adb 100644 --- a/src/basicgetokframe.h +++ b/src/basicgetokframe.h @@ -156,6 +156,33 @@ public: { return _redelivered.get(0); } + + /** + * Process the frame + * @param connection The connection over which it was received + * @return bool Was it succesfully processed? + */ + virtual bool process(ConnectionImpl *connection) override + { + // we need the appropriate channel + ChannelImpl *channel = connection->channel(this->channel()); + + // channel does not exist + if (!channel) return false; + + // report (if this function returns false, it means that the channel + // object no longer is valid) + if (!channel->reportSuccess(_messageCount)) return true; + + // construct the message + channel->message(*this); + + // we're synchronized + channel->synchronized(); + + // done + return true; + } }; /** diff --git a/src/channelimpl.cpp b/src/channelimpl.cpp index 5102090..9d30ab9 100644 --- a/src/channelimpl.cpp +++ b/src/channelimpl.cpp @@ -7,6 +7,7 @@ */ #include "includes.h" #include "basicdeliverframe.h" +#include "basicgetokframe.h" #include "basicreturnframe.h" #include "messageimpl.h" #include "consumedmessage.h" @@ -37,6 +38,8 @@ #include "basicnackframe.h" #include "basicrecoverframe.h" #include "basicrejectframe.h" +#include "basicgetframe.h" + /** * Set up namespace @@ -516,9 +519,9 @@ DeferredConsumer& ChannelImpl::consume(const std::string &queue, const std::stri * * The onSuccess() callback that you can install should have the following signature: * - * void myCallback(AMQP::Channel *channel, const std::string& tag); + * void myCallback(const std::string& tag); * - * For example: channel.declareQueue("myqueue").onSuccess([](AMQP::Channel *channel, const std::string& tag) { + * For example: channel.declareQueue("myqueue").onSuccess([](const std::string& tag) { * * std::cout << "Started consuming under tag " << tag << std::endl; * @@ -539,6 +542,53 @@ DeferredCancel &ChannelImpl::cancel(const std::string &tag) return *deferred; } +/** + * Retrieve a single message from RabbitMQ + * + * When you call this method, you can get one single message from the queue (or none + * at all if the queue is empty). The deferred object that is returned, should be used + * to install a onEmpty() and onSuccess() callback function that will be called + * when the message is consumed and/or when the message could not be consumed. + * + * The following flags are supported: + * + * - noack if set, consumed messages do not have to be acked, this happens automatically + * + * @param queue name of the queue to consume from + * @param flags optional flags + * + * The object returns a deferred handler. Callbacks can be installed + * using onSuccess(), onEmpty(), onError() and onFinalize() methods. + * + * The onSuccess() callback has the following signature: + * + * void myCallback(const Message &message, uint64_t deliveryTag, bool redelivered); + * + * For example: channel.get("myqueue").onSuccess([](const Message &message, uint64_t deliveryTag, bool redelivered) { + * + * std::cout << "Message fetched" << std::endl; + * + * }).onEmpty([]() { + * + * std::cout << "Queue is empty" << std::endl; + * + * }); + */ +DeferredGet &ChannelImpl::get(const std::string &queue, int flags) +{ + // the get frame to send + BasicGetFrame frame(_id, queue, flags & noack); + + // send the frame, and create deferred object + auto *deferred = new DeferredGet(this, send(frame)); + + // push to list + push(deferred); + + // done + return *deferred; +} + /** * Acknowledge a message * @param deliveryTag the delivery tag @@ -674,7 +724,7 @@ void ChannelImpl::reportMessage() } /** - * Create an incoming message + * Create an incoming message from a consume call * @param frame * @return ConsumedMessage */ @@ -687,6 +737,20 @@ ConsumedMessage *ChannelImpl::message(const BasicDeliverFrame &frame) return _message = new ConsumedMessage(frame); } +/** + * Create an incoming message from a get call + * @param frame + * @return ConsumedMessage + */ +ConsumedMessage *ChannelImpl::message(const BasicGetOKFrame &frame) +{ + // destruct if message is already set + if (_message) delete _message; + + // construct message + return _message = new ConsumedMessage(frame); +} + /** * End of namespace */ diff --git a/src/consumedmessage.h b/src/consumedmessage.h index 447bdc6..09c9c0e 100644 --- a/src/consumedmessage.h +++ b/src/consumedmessage.h @@ -44,6 +44,16 @@ public: _consumerTag(frame.consumerTag()), _deliveryTag(frame.deliveryTag()), _redelivered(frame.redelivered()) {} + /** + * Constructor + * @param frame + */ + ConsumedMessage(const BasicGetOKFrame &frame) : + MessageImpl(frame.exchange(), frame.routingKey()), + _deliveryTag(frame.deliveryTag()), _redelivered(frame.redelivered()) + {} + + /** * Destructor */ diff --git a/src/deferredget.cpp b/src/deferredget.cpp new file mode 100644 index 0000000..f412ba4 --- /dev/null +++ b/src/deferredget.cpp @@ -0,0 +1,64 @@ +/** + * DeferredGet.cpp + * + * Implementation of the DeferredGet call + * + * @author Emiel Bruijntjes + * @copyright 2014 Copernica BV + */ + +/** + * Dependencies + */ +#include "includes.h" + +/** + * Set up namespace + */ +namespace AMQP { + +/** + * Report success, a get message succeeded and the message is expected soon + * @param messageCount Message count + * @return Deferred + */ +Deferred *DeferredGet::reportSuccess(uint32_t messageCount) const +{ + // make copies of the callbacks + auto messageCallback = _messageCallback; + auto finalizeCallback = _finalizeCallback; + + // we now know the name, so we can install the message callback on the channel + _channel->install("", [messageCallback, finalizeCallback](const Message &message, uint64_t deliveryTag, bool redelivered) { + + // call the callbacks + if (messageCallback) messageCallback(message, deliveryTag, redelivered); + + // call the finalize callback + if (finalizeCallback) finalizeCallback(); + }); + + // return next object + return _next; +} + +/** + * Report success, although no message could be get + */ +Deferred *DeferredGet::reportSuccess() const +{ + // check if a callback was set + if (_emptyCallback) _emptyCallback(); + + // call finalize callback + if (_finalizeCallback) _finalizeCallback(); + + // return next object + return _next; +} + +/** + * End of namespace + */ +} + From bcbe50e22cc70b6129d28b7e0921180993aa3a1f Mon Sep 17 00:00:00 2001 From: Emiel Bruijntjes Date: Fri, 1 Aug 2014 10:05:02 +0200 Subject: [PATCH 10/15] basic.get callbacks only have to be installed for a single message, so we uninstall it right after the message was handled --- src/deferredget.cpp | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/src/deferredget.cpp b/src/deferredget.cpp index f412ba4..68c1383 100644 --- a/src/deferredget.cpp +++ b/src/deferredget.cpp @@ -27,9 +27,13 @@ Deferred *DeferredGet::reportSuccess(uint32_t messageCount) const // make copies of the callbacks auto messageCallback = _messageCallback; auto finalizeCallback = _finalizeCallback; + auto *channel = _channel; // we now know the name, so we can install the message callback on the channel - _channel->install("", [messageCallback, finalizeCallback](const Message &message, uint64_t deliveryTag, bool redelivered) { + _channel->install("", [channel, messageCallback, finalizeCallback](const Message &message, uint64_t deliveryTag, bool redelivered) { + + // we can remove the callback now from the channel + channel->uninstall(""); // call the callbacks if (messageCallback) messageCallback(message, deliveryTag, redelivered); From dca76db2f04a106b0dbcdf501950c83c27f74ea7 Mon Sep 17 00:00:00 2001 From: Emiel Bruijntjes Date: Fri, 1 Aug 2014 10:37:18 +0200 Subject: [PATCH 11/15] empty messages could not be consumed or get, this has now been fixed --- src/basicheaderframe.h | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/src/basicheaderframe.h b/src/basicheaderframe.h index ac0f644..f6cc056 100644 --- a/src/basicheaderframe.h +++ b/src/basicheaderframe.h @@ -119,11 +119,14 @@ public: if (!message) return false; // store size - message->setBodySize(bodySize()); + message->setBodySize(_bodySize); // and copy the meta data message->set(_metadata); + // for empty bodies we're ready now + if (_bodySize == 0) channel->reportMessage(); + // done return true; } From 37a51cdc7b71932532d079181ab6a1d1e57888fb Mon Sep 17 00:00:00 2001 From: Emiel Bruijntjes Date: Fri, 1 Aug 2014 11:55:07 +0200 Subject: [PATCH 12/15] basic.get is asynchronous until the entire body has been received, only then subsequent messages are sent + first work in progress on implementing smarter buffers --- amqpcpp.h | 2 ++ include/buffer.h | 42 +++++++++++++++++++++++++++++++ include/bytebuffer.h | 57 +++++++++++++++++++++++++++++++++++++++++++ include/channel.h | 1 + include/connection.h | 26 +++++++++++++++++++- src/basicgetokframe.h | 4 +-- src/channelimpl.cpp | 12 ++++++--- 7 files changed, 138 insertions(+), 6 deletions(-) create mode 100644 include/buffer.h create mode 100644 include/bytebuffer.h diff --git a/amqpcpp.h b/amqpcpp.h index b63e7fe..fce3baa 100644 --- a/amqpcpp.h +++ b/amqpcpp.h @@ -37,6 +37,8 @@ #include // amqp types +#include +#include #include #include #include diff --git a/include/buffer.h b/include/buffer.h new file mode 100644 index 0000000..73eaf13 --- /dev/null +++ b/include/buffer.h @@ -0,0 +1,42 @@ +/** + * Buffer.h + * + * Interface that can be implemented by client applications and that + * is parsed to the Connection::parse() method. + * + * Normally, the Connection::parse() method is fed with a byte + * array. However, if you're receiving big frames, it may be inconvenient + * to copy these big frames into continguous byte arrays, and you + * prefer using objects that internally use linked lists or other + * ways to store the bytes. In such sitations, you can implement this + * interface and pass that to the connection. + * + * @author Emiel Bruijntjes + * @copyright 2014 Copernica BV + */ + +/** + * Include guard + */ +#pragma once + +/** + * Namespace + */ +namespace AMQP { + +/** + * Class definition + */ +class Buffer +{ + + +}; + +/** + * End of namespace + */ +} + + diff --git a/include/bytebuffer.h b/include/bytebuffer.h new file mode 100644 index 0000000..7074ee0 --- /dev/null +++ b/include/bytebuffer.h @@ -0,0 +1,57 @@ +/** + * ByteByffer.h + * + * Very simple implementation of the buffer class that simply wraps + * around a buffer of bytes + * + * @author Emiel Bruijntjes + * @copyright 2014 Copernica BV + */ + +/** + * Include guard + */ +#pragma once + +/** + * Open namespace + */ +namespace AMQP { + +/** + * Class definition + */ +class ByteBuffer : public Buffer +{ +private: + /** + * The actual byte buffer + * @var const char * + */ + const char *_data; + + /** + * Size of the buffer + * @var size_t + */ + size_t _size; + +public: + /** + * Constructor + * @param data + * @param size + */ + ByteBuffer(const char *data, size_t size) : _data(data), _size(size) {} + + /** + * Destructor + */ + virtual ~ByteBuffer() {} + +}; + +/** + * End namespace + */ +} diff --git a/include/channel.h b/include/channel.h index e4da8bb..0473c5e 100644 --- a/include/channel.h +++ b/include/channel.h @@ -320,6 +320,7 @@ public: bool publish(const std::string &exchange, const std::string &routingKey, const Envelope &envelope) { return _implementation.publish(exchange, routingKey, envelope); } bool publish(const std::string &exchange, const std::string &routingKey, const std::string &message) { return _implementation.publish(exchange, routingKey, Envelope(message)); } bool publish(const std::string &exchange, const std::string &routingKey, const char *message, size_t size) { return _implementation.publish(exchange, routingKey, Envelope(message, size)); } + bool publish(const std::string &exchange, const std::string &routingKey, const char *message) { return _implementation.publish(exchange, routingKey, Envelope(message, strlen(message))); } /** * Set the Quality of Service (QOS) for this channel diff --git a/include/connection.h b/include/connection.h index a67e6ad..c1fccea 100644 --- a/include/connection.h +++ b/include/connection.h @@ -77,8 +77,32 @@ public: */ size_t parse(const char *buffer, size_t size) { - return _implementation.parse(buffer, size); + //return _implementation.parse(ByteBuffer(buffer, size)); + return _implementation.parse(buffer, size); } + + /** + * Parse data that was recevied from RabbitMQ + * + * Every time that data comes in from RabbitMQ, you should call this method to parse + * the incoming data, and let it handle by the AMQP library. This method returns the number + * of bytes that were processed. + * + * If not all bytes could be processed because it only contained a partial frame, you should + * call this same method later on when more data is available. The AMQP library does not do + * any buffering, so it is up to the caller to ensure that the old data is also passed in that + * later call. + * + * This method accepts a buffer object. This is an interface that is defined by the AMQP + * library, that can be implemented by you to allow faster access to a buffer. + * + * @param buffer buffer to decode + * @return number of bytes that were processed + */ + //size_t parse(const Buffer &buffer) + //{ + // return _implementation.parse(buffer); + //} /** * Close the connection diff --git a/src/basicgetokframe.h b/src/basicgetokframe.h index 6945adb..ebccacc 100644 --- a/src/basicgetokframe.h +++ b/src/basicgetokframe.h @@ -177,8 +177,8 @@ public: // construct the message channel->message(*this); - // we're synchronized - channel->synchronized(); + // notice that the channel is not yet synchronized here, because + // we first have to receive the entire body // done return true; diff --git a/src/channelimpl.cpp b/src/channelimpl.cpp index 9d30ab9..8eb2e0f 100644 --- a/src/channelimpl.cpp +++ b/src/channelimpl.cpp @@ -703,6 +703,15 @@ void ChannelImpl::reportMessage() // skip if there is no message if (!_message) return; + // after the report the channel may be destructed, monitor that + Monitor monitor(this); + + // synchronize the channel if this comes from a basic.get frame + if (_message->consumer().empty()) synchronized(); + + // syncing the channel may destruct the channel + if (!monitor.valid()) return; + // look for the consumer auto iter = _consumers.find(_message->consumer()); if (iter == _consumers.end()) return; @@ -710,9 +719,6 @@ void ChannelImpl::reportMessage() // is this a valid callback method if (!iter->second) return; - // after the report the channel may be destructed, monitor that - Monitor monitor(this); - // call the callback _message->report(iter->second); From a8ff6de5505f92ac5dd708a876e26eaa7cd097ef Mon Sep 17 00:00:00 2001 From: Emiel Bruijntjes Date: Mon, 4 Aug 2014 13:44:17 +0200 Subject: [PATCH 13/15] fixed bug in channel.get() calls --- src/deferredget.cpp | 9 ++++++--- 1 file changed, 6 insertions(+), 3 deletions(-) diff --git a/src/deferredget.cpp b/src/deferredget.cpp index 68c1383..0cf24a9 100644 --- a/src/deferredget.cpp +++ b/src/deferredget.cpp @@ -32,14 +32,17 @@ Deferred *DeferredGet::reportSuccess(uint32_t messageCount) const // we now know the name, so we can install the message callback on the channel _channel->install("", [channel, messageCallback, finalizeCallback](const Message &message, uint64_t deliveryTag, bool redelivered) { - // we can remove the callback now from the channel - channel->uninstall(""); - + // install a monitor to deal with the case that the channel is removed + Monitor monitor(channel); + // call the callbacks if (messageCallback) messageCallback(message, deliveryTag, redelivered); // call the finalize callback if (finalizeCallback) finalizeCallback(); + + // we can remove the callback now from the channel + if (monitor.valid()) channel->uninstall(""); }); // return next object From 4acee206d2082e59bc136e3bbb0d718d0593e2ec Mon Sep 17 00:00:00 2001 From: Richard Hodges Date: Thu, 7 Aug 2014 10:59:37 +0100 Subject: [PATCH 14/15] testing - added stub to build a test based on asio --- CMakeLists.txt | 1 + amqp_boost_test/CMakeLists.txt | 37 ++++++++++++++++++++++++++++++++++ amqp_boost_test/main.cpp | 8 ++++++++ 3 files changed, 46 insertions(+) create mode 100644 amqp_boost_test/CMakeLists.txt create mode 100644 amqp_boost_test/main.cpp diff --git a/CMakeLists.txt b/CMakeLists.txt index da1c969..4670b58 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -23,6 +23,7 @@ endmacro() add_subdirectory(src) add_subdirectory(include) +add_subdirectory(amqp_boost_test) include_directories(${CMAKE_SOURCE_DIR}/include) add_library(amqp-cpp STATIC ${SRCS}) diff --git a/amqp_boost_test/CMakeLists.txt b/amqp_boost_test/CMakeLists.txt new file mode 100644 index 0000000..0669683 --- /dev/null +++ b/amqp_boost_test/CMakeLists.txt @@ -0,0 +1,37 @@ +set(Boost_USE_STATIC_LIBS ON) +set(Boost_USE_MULTITHREADED ON) +set(Boost_USE_STATIC_RUNTIME OFF) +find_package(Boost 1.55 REQUIRED COMPONENTS thread system) + +if(NOT Boost_FOUND) + message( FATAL_ERROR "boost must be installed") +endif() + +include_directories(${Boost_INCLUDE_DIRS}) + +set(SRC main.cpp) + +if("${CMAKE_SYSTEM_NAME}" STREQUAL "Linux") +set(SPECIFIC_HOST_LIBS rt pthread) +else() +set(SPECIFIC_HOST_LIBS) +endif() + +set(LIBS +${Boost_LIBRARIES} +amqp-cpp +) + +include_directories(SYSTEM ${CRYPTLIB_INCLUDE_PATH}) +link_directories(${Boost_LIBRARY_PATH}) + +include_directories(SYSTEM ${AMQP-CPP_INCLUDE_PATH}) + +add_executable(amqp_boost_test ${SRC}) +target_link_libraries(amqp_boost_test ${LIBS} ${SPECIFIC_HOST_LIBS}) + +install(TARGETS amqp_boost_test + RUNTIME DESTINATION bin + LIBRARY DESTINATION lib + ARCHIVE DESTINATION lib +) diff --git a/amqp_boost_test/main.cpp b/amqp_boost_test/main.cpp new file mode 100644 index 0000000..0dd80bc --- /dev/null +++ b/amqp_boost_test/main.cpp @@ -0,0 +1,8 @@ +#include + +int main(int argc, const char* argv[]) +{ + + + return 0; +} \ No newline at end of file From d2a84b5f8d3432fd526596f7121aee6a50da9ced Mon Sep 17 00:00:00 2001 From: Richard Hodges Date: Thu, 7 Aug 2014 11:09:02 +0100 Subject: [PATCH 15/15] build - added new source files to CMakeLists --- include/CMakeLists.txt | 3 +++ src/CMakeLists.txt | 1 + 2 files changed, 4 insertions(+) diff --git a/include/CMakeLists.txt b/include/CMakeLists.txt index 1e063b2..db219a8 100644 --- a/include/CMakeLists.txt +++ b/include/CMakeLists.txt @@ -1,6 +1,8 @@ add_sources( array.h booleanset.h +buffer.h +bytebuffer.h callbacks.h channel.h channelimpl.h @@ -14,6 +16,7 @@ deferred.h deferredcancel.h deferredconsumer.h deferreddelete.h +deferredget.h deferredqueue.h entityimpl.h envelope.h diff --git a/src/CMakeLists.txt b/src/CMakeLists.txt index 798a7fa..b9c8707 100644 --- a/src/CMakeLists.txt +++ b/src/CMakeLists.txt @@ -44,6 +44,7 @@ connectiontuneokframe.h consumedmessage.h deferredcancel.cpp deferredconsumer.cpp +deferredget.cpp exception.h exchangebindframe.h exchangebindokframe.h