Merge pull request #3 from CopernicaMarketingSoftware/master

merge latest from amqp-cpp upstream
This commit is contained in:
Steven G 2018-02-22 09:36:24 +01:00 committed by GitHub
commit bfa6f1d15c
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
9 changed files with 73 additions and 30 deletions

View File

@ -28,6 +28,14 @@ matrix:
# Linux / GCC # Linux / GCC
################ ################
- os: linux
compiler: gcc
env:
- COMPILER_PACKAGE=g++-5
- C_COMPILER=gcc-5
- CXX_COMPILER=g++-5
- CXXFLAGS=-std=c++11
- os: linux - os: linux
env: env:
- COMPILER_PACKAGE=g++-6 - COMPILER_PACKAGE=g++-6
@ -86,6 +94,6 @@ script:
export CC=/usr/bin/$C_COMPILER && export CC=/usr/bin/$C_COMPILER &&
export CXX=/usr/bin/$CXX_COMPILER && export CXX=/usr/bin/$CXX_COMPILER &&
mkdir build.release && cd build.release && mkdir build.release && cd build.release &&
cmake .. ${CMAKE_OPTIONS} -DAMQP-CPP_BUILD_EXAMPLES=ON -DAMQP-CPP_LINUX_TCP=ON -GNinja && cmake ${CMAKE_OPTIONS} -DAMQP-CPP_BUILD_EXAMPLES=ON -DAMQP-CPP_LINUX_TCP=ON -GNinja .. &&
cmake --build . --config Release && cmake --config Release --build . &&
cd .." cd .."

View File

@ -82,14 +82,14 @@ The CMake file supports both building and installing. You can choose not to use
``` bash ``` bash
mkdir build mkdir build
cd build cd build
cmake .. [-DBUILD_SHARED] [-DLINUX_TCP] cmake .. [-DAMQP-CPP_AMQBUILD_SHARED] [-DAMQP-CPP_LINUX_TCP]
cmake --build .. --target install cmake --build .. --target install
``` ```
Option|Default|Meaning Option|Default|Meaning
------|-------|------- ------|-------|-------
BUILD_SHARED|OFF|Static lib(ON) or shared lib(OFF)? Shared is not supported on Windows. AMQP-CPP_BUILD_SHARED|OFF|Static lib(ON) or shared lib(OFF)? Shared is not supported on Windows.
LINUX_TCP|OFF|Should the Linux-only TCP module be built? AMQP-CPP_LINUX_TCP|OFF|Should the Linux-only TCP module be built?
## Make ## Make
Installing the library is as easy Installing the library is as easy

View File

@ -1,7 +1,7 @@
/** /**
* Class describing a (mid-level) AMQP channel implementation * Class describing a (mid-level) AMQP channel implementation
* *
* @copyright 2014 - 2017 Copernica BV * @copyright 2014 - 2018 Copernica BV
*/ */
/** /**
@ -170,6 +170,7 @@ public:
* - durable exchange survives a broker restart * - durable exchange survives a broker restart
* - autodelete exchange is automatically removed when all connected queues are removed * - autodelete exchange is automatically removed when all connected queues are removed
* - passive only check if the exchange exist * - passive only check if the exchange exist
* - internal create an internal exchange
* *
* @param name name of the exchange * @param name name of the exchange
* @param type exchange type * @param type exchange type

View File

@ -5,7 +5,7 @@
* that has a private constructor so that it can not be used from outside * that has a private constructor so that it can not be used from outside
* the AMQP library * the AMQP library
* *
* @copyright 2014 - 2017 Copernica BV * @copyright 2014 - 2018 Copernica BV
*/ */
/** /**
@ -122,7 +122,8 @@ private:
std::queue<std::pair<bool, CopiedBuffer>> _queue; std::queue<std::pair<bool, CopiedBuffer>> _queue;
/** /**
* Are we currently operating in synchronous mode? * Are we currently operating in synchronous mode? Meaning: do we first have
* to wait for the answer to previous instructions before we send a new instruction?
* @var bool * @var bool
*/ */
bool _synchronous = false; bool _synchronous = false;
@ -567,6 +568,9 @@ public:
// if we are still in connected state we are now ready // if we are still in connected state we are now ready
if (_state == state_connected) _state = state_ready; if (_state == state_connected) _state = state_ready;
// the last (possibly synchronous) operation was received, so we're no longer in synchronous mode
if (_synchronous && _queue.empty()) _synchronous = false;
// inform handler // inform handler
if (_readyCallback) _readyCallback(); if (_readyCallback) _readyCallback();
@ -586,7 +590,6 @@ public:
{ {
// change state // change state
_state = state_closed; _state = state_closed;
_synchronous = false;
// create a monitor, because the callbacks could destruct the current object // create a monitor, because the callbacks could destruct the current object
Monitor monitor(this); Monitor monitor(this);
@ -620,6 +623,9 @@ public:
{ {
// skip if there is no oldest callback // skip if there is no oldest callback
if (!_oldestCallback) return true; if (!_oldestCallback) return true;
// the last (possibly synchronous) operation was received, so we're no longer in synchronous mode
if (_synchronous && _queue.empty()) _synchronous = false;
// we are going to call callbacks that could destruct the channel // we are going to call callbacks that could destruct the channel
Monitor monitor(this); Monitor monitor(this);

View File

@ -3,7 +3,7 @@
* *
* The various flags that are supported * The various flags that are supported
* *
* @copyright 2014 Copernica BV * @copyright 2014 - 2018 Copernica BV
*/ */
/** /**
@ -38,6 +38,7 @@ extern const int multiple;
extern const int requeue; extern const int requeue;
extern const int readable; extern const int readable;
extern const int writable; extern const int writable;
extern const int internal;
/** /**
* End of namespace * End of namespace

View File

@ -47,7 +47,7 @@ namespace AMQP {
*/ */
class LibBoostAsioHandler : public virtual TcpHandler class LibBoostAsioHandler : public virtual TcpHandler
{ {
private: protected:
/** /**
* Helper class that wraps a boost io_service socket monitor. * Helper class that wraps a boost io_service socket monitor.
@ -62,7 +62,7 @@ private:
*/ */
boost::asio::io_service & _ioservice; boost::asio::io_service & _ioservice;
typedef std::weak_ptr<boost::asio::io_service::strand> strand_weak_ptr; using strand_weak_ptr = std::weak_ptr<boost::asio::io_service::strand>;
/** /**
* The boost asio io_service::strand managed pointer. * The boost asio io_service::strand managed pointer.
@ -77,7 +77,6 @@ private:
*/ */
boost::asio::posix::stream_descriptor _socket; boost::asio::posix::stream_descriptor _socket;
/** /**
* A boolean that indicates if the watcher is monitoring for read events. * A boolean that indicates if the watcher is monitoring for read events.
* @var _read True if reads are being monitored else false. * @var _read True if reads are being monitored else false.
@ -315,7 +314,7 @@ private:
*/ */
boost::asio::io_service & _ioservice; boost::asio::io_service & _ioservice;
typedef std::weak_ptr<boost::asio::io_service::strand> strand_weak_ptr; using strand_weak_ptr = std::weak_ptr<boost::asio::io_service::strand>;
/** /**
* The boost asio io_service::strand managed pointer. * The boost asio io_service::strand managed pointer.
@ -330,6 +329,7 @@ private:
boost::asio::deadline_timer _timer; boost::asio::deadline_timer _timer;
using handler_fn = boost::function<void(boost::system::error_code)>; using handler_fn = boost::function<void(boost::system::error_code)>;
/** /**
* Binds and returns a lamba function handler for the io operation. * Binds and returns a lamba function handler for the io operation.
* @param connection The connection being watched. * @param connection The connection being watched.
@ -457,7 +457,7 @@ private:
*/ */
boost::asio::io_service & _ioservice; boost::asio::io_service & _ioservice;
typedef std::shared_ptr<boost::asio::io_service::strand> strand_shared_ptr; using strand_shared_ptr = std::shared_ptr<boost::asio::io_service::strand>;
/** /**
* The boost asio io_service::strand managed pointer. * The boost asio io_service::strand managed pointer.
@ -465,17 +465,18 @@ private:
*/ */
strand_shared_ptr _strand; strand_shared_ptr _strand;
/** /**
* All I/O watchers that are active, indexed by their filedescriptor * All I/O watchers that are active, indexed by their filedescriptor
* @var std::map<int,Watcher> * @var std::map<int,Watcher>
*/ */
std::map<int, std::shared_ptr<Watcher> > _watchers; std::map<int, std::shared_ptr<Watcher> > _watchers;
/**
* The boost asio io_service::deadline_timer managed pointer.
* @var class std::shared_ptr<Timer>
*/
std::shared_ptr<Timer> _timer; std::shared_ptr<Timer> _timer;
/** /**
* Method that is called by AMQP-CPP to register a filedescriptor for readability or writability * Method that is called by AMQP-CPP to register a filedescriptor for readability or writability
* @param connection The TCP connection object that is reporting * @param connection The TCP connection object that is reporting
@ -584,4 +585,3 @@ public:
* End of namespace * End of namespace
*/ */
} }

View File

@ -3,7 +3,7 @@
* *
* Implementation for a channel * Implementation for a channel
* *
* @copyright 2014 - 2017 Copernica BV * @copyright 2014 - 2018 Copernica BV
*/ */
#include "includes.h" #include "includes.h"
#include "basicdeliverframe.h" #include "basicdeliverframe.h"
@ -252,7 +252,7 @@ Deferred &ChannelImpl::close()
Deferred &ChannelImpl::declareExchange(const std::string &name, ExchangeType type, int flags, const Table &arguments) Deferred &ChannelImpl::declareExchange(const std::string &name, ExchangeType type, int flags, const Table &arguments)
{ {
// convert exchange type // convert exchange type
std::string exchangeType; const char *exchangeType = "";
// convert the exchange type into a string // convert the exchange type into a string
if (type == ExchangeType::fanout) exchangeType = "fanout"; if (type == ExchangeType::fanout) exchangeType = "fanout";
@ -261,8 +261,15 @@ Deferred &ChannelImpl::declareExchange(const std::string &name, ExchangeType typ
else if (type == ExchangeType::headers) exchangeType = "headers"; else if (type == ExchangeType::headers) exchangeType = "headers";
else if (type == ExchangeType::consistent_hash) exchangeType = "x-consistent-hash"; else if (type == ExchangeType::consistent_hash) exchangeType = "x-consistent-hash";
// the boolean options
bool passive = flags & AMQP::passive;
bool durable = flags & AMQP::durable;
bool autodelete = flags & AMQP::autodelete;
bool internal = flags & AMQP::internal;
bool nowait = flags & AMQP::nowait;
// send declare exchange frame // send declare exchange frame
return push(ExchangeDeclareFrame(_id, name, exchangeType, (flags & passive) != 0, (flags & durable) != 0, false, arguments)); return push(ExchangeDeclareFrame(_id, name, exchangeType, passive, durable, autodelete, internal, nowait, arguments));
} }
/** /**
@ -686,7 +693,7 @@ bool ChannelImpl::send(const Frame &frame)
// added to the list of deferred objects. it will be notified about // added to the list of deferred objects. it will be notified about
// the error when the close operation succeeds // the error when the close operation succeeds
if (_state == state_closing) return true; if (_state == state_closing) return true;
// are we currently in synchronous mode or are there // are we currently in synchronous mode or are there
// other frames waiting for their turn to be sent? // other frames waiting for their turn to be sent?
if (_synchronous || !_queue.empty()) if (_synchronous || !_queue.empty())

View File

@ -1,7 +1,7 @@
/** /**
* Class describing an AMQP exchange declare frame * Class describing an AMQP exchange declare frame
* *
* @copyright 2014 Copernica BV * @copyright 2014 - 2018 Copernica BV
*/ */
/** /**
@ -77,16 +77,17 @@ public:
* @param type exchange type * @param type exchange type
* @param passive do not create exchange if it does not exist * @param passive do not create exchange if it does not exist
* @param durable durable exchange * @param durable durable exchange
* @param autodelete is this an auto-delete exchange?
* @param internal is this an internal exchange
* @param noWait do not wait on response * @param noWait do not wait on response
* @param arguments additional arguments * @param arguments additional arguments
*/ */
ExchangeDeclareFrame(uint16_t channel, const std::string& name, const std::string& type, bool passive, bool durable, bool noWait, const Table& arguments) : ExchangeDeclareFrame(uint16_t channel, const std::string& name, const char *type, bool passive, bool durable, bool autodelete, bool internal, bool nowait, const Table& arguments) :
ExchangeFrame(channel, (uint32_t)(name.length() + type.length() + arguments.size() + 5)), // size of name, type and arguments + 1 (all booleans are stored in 1 byte) + 2 (deprecated short) + 2 (string sizes) ExchangeFrame(channel, (uint32_t)(name.length() + strlen(type) + arguments.size() + 5)), // size of name, type and arguments + 1 (all booleans are stored in 1 byte) + 2 (deprecated short) + 2 (string sizes)
_name(name), _name(name),
_type(type), _type(type),
_bools(passive, durable, false, false, noWait), _bools(passive, durable, autodelete, internal, nowait),
_arguments(arguments) _arguments(arguments) {}
{}
/** /**
* Construct parsing a declare frame from a received frame * Construct parsing a declare frame from a received frame
@ -162,6 +163,24 @@ public:
{ {
return _bools.get(1); return _bools.get(1);
} }
/**
* Is this an autodelete exchange?
* @return bool
*/
bool autoDelete() const
{
return _bools.get(2);
}
/**
* Is this an internal exchange?
* @return bool
*/
bool internal() const
{
return _bools.get(3);
}
/** /**
* Do not wait for a response * Do not wait for a response

View File

@ -3,7 +3,7 @@
* *
* The various flags that are supported * The various flags that are supported
* *
* @copyright 2014 Copernica BV * @copyright 2014 - 2018 Copernica BV
*/ */
#include "includes.h" #include "includes.h"
@ -32,6 +32,7 @@ const int immediate = 0x1000;
const int redelivered = 0x2000; const int redelivered = 0x2000;
const int multiple = 0x4000; const int multiple = 0x4000;
const int requeue = 0x8000; const int requeue = 0x8000;
const int internal = 0x10000;
/** /**
* Flags for event loops * Flags for event loops