From 5269f51a92daf6a89be34195bc98f8d46b0d7c01 Mon Sep 17 00:00:00 2001 From: Emiel Bruijntjes Date: Sun, 5 Jan 2014 05:19:35 -0800 Subject: [PATCH] Implemented setting the quality of service (and it turns out that the prefetch size is not implemented in rabbitMQ, nor is setting the qos for the entire connection, so we have only implemented it for a channel) --- include/channel.h | 10 +++++++ include/channelhandler.h | 6 +++++ include/channelimpl.h | 15 +++++++++++ include/connection.h | 11 -------- include/connectionimpl.h | 8 ------ src/basicqosframe.h | 55 ++++++++++++++++++++------------------ src/basicqosokframe.cpp | 40 +++++++++++++++++++++++++++ src/basicqosokframe.h | 8 ++++++ src/channelflowokframe.cpp | 2 +- src/channelimpl.cpp | 15 +++++++++++ src/connectionimpl.cpp | 14 ++-------- tests/main.cpp | 24 ++++++++++++----- tests/myconnection.cpp | 21 ++++++++++++--- tests/myconnection.h | 8 +++++- 14 files changed, 169 insertions(+), 68 deletions(-) create mode 100644 src/basicqosokframe.cpp diff --git a/include/channel.h b/include/channel.h index 8f4ac2e..8372223 100644 --- a/include/channel.h +++ b/include/channel.h @@ -272,6 +272,16 @@ public: bool publish(const std::string &exchange, const std::string &routingKey, int flags, const char *message, size_t size) { return _implementation.publish(exchange, routingKey, flags, Envelope(message, size)); } bool publish(const std::string &exchange, const std::string &routingKey, const char *message, size_t size) { return _implementation.publish(exchange, routingKey, 0, Envelope(message, size)); } + /** + * Set the Quality of Service (QOS) for this channel + * @param prefetchCount maximum number of messages to prefetch + * @return bool whether the Qos frame is sent. + */ + bool setQos(uint16_t prefetchCount) + { + return _implementation.setQos(prefetchCount); + } + /** * Close the current channel * @return bool diff --git a/include/channelhandler.h b/include/channelhandler.h index 08d9dde..6fbbdf1 100644 --- a/include/channelhandler.h +++ b/include/channelhandler.h @@ -144,6 +144,12 @@ public: */ virtual void onQueuePurged(Channel *channel, uint32_t messageCount) {} + /** + * Method that is called when the quality-of-service was changed + * This is the result of a call to Channel::setQos() + */ + virtual void onQosSet(Channel *channel) {} + }; /** diff --git a/include/channelimpl.h b/include/channelimpl.h index a3198be..13deb6f 100644 --- a/include/channelimpl.h +++ b/include/channelimpl.h @@ -229,6 +229,13 @@ public: */ bool publish(const std::string &exchange, const std::string &routingKey, int flags, const Envelope &envelope); + /** + * Set the Quality of Service (QOS) of the entire connection + * @param prefetchCount maximum number of messages to prefetch + * @return bool whether the Qos frame is sent. + */ + bool setQos(uint16_t prefetchCount); + /** * Close the current channel * @return bool @@ -380,6 +387,14 @@ public: if (_handler) _handler->onQueuePurged(_parent, messageCount); } + /** + * Report that the qos has been set + */ + void reportQosSet() + { + if (_handler) _handler->onQosSet(_parent); + } + /** * The channel class is its friend, thus can it instantiate this object */ diff --git a/include/connection.h b/include/connection.h index 9a7af4f..64298a8 100644 --- a/include/connection.h +++ b/include/connection.h @@ -89,17 +89,6 @@ public: return _implementation.close(); } - /** - * Set the Quality of Service (QOS) of the entire connection - * @param prefetchSize maximum size (in octets) of messages to be prefetched - * @param prefetchCount maximum number of messages to prefetch - * @return bool whether the Qos frame is sent. - */ - bool setQos(uint32_t prefetchSize, uint16_t prefetchCount) - { - return _implementation.setQos(prefetchSize, prefetchCount); - } - /** * Some classes have access to private properties */ diff --git a/include/connectionimpl.h b/include/connectionimpl.h index 938bdca..0d6e9a9 100644 --- a/include/connectionimpl.h +++ b/include/connectionimpl.h @@ -275,14 +275,6 @@ public: // @Todo: notify all channels of closed connection } - /** - * Set the Quality of Service (QOS) of the entire connection - * @param prefetchSize maximum size (in octets) of messages to be prefetched - * @param prefetchCount maximum number of messages to prefetch - * @return bool whether the Qos frame is sent. - */ - bool setQos(uint32_t prefetchSize, uint16_t prefetchCount); - /** * The actual connection is a friend and can construct this class */ diff --git a/src/basicqosframe.h b/src/basicqosframe.h index f89aebd..6bf3049 100644 --- a/src/basicqosframe.h +++ b/src/basicqosframe.h @@ -15,28 +15,28 @@ namespace AMQP { class BasicQosFrame : public BasicFrame { private: - /** - * specifies the size of the prefetch window in octets - * @var int32_t - */ - int32_t _prefetchSize; + /** + * specifies the size of the prefetch window in octets + * @var int32_t + */ + int32_t _prefetchSize; - /** - * specifies a prefetch window in terms of whole messages - * @var int16_t - */ - int16_t _prefetchCount; + /** + * specifies a prefetch window in terms of whole messages + * @var int16_t + */ + int16_t _prefetchCount; + + /** + * apply QoS settings to entire connection + * @var BooleanSet + */ + BooleanSet _global; - /** - * apply QoS settings to entire connection - * @var BooleanSet - */ - BooleanSet _global; protected: /** - * Encode a frame on a string buffer - * - * @param buffer buffer to write frame to + * Encode a frame on a string buffer + * @param buffer buffer to write frame to */ virtual void fill(OutBuffer& buffer) const override { @@ -50,27 +50,25 @@ protected: } public: - /** - * Destructor - */ - virtual ~BasicQosFrame() {} /** * Construct a basic qos frame * * @param channel channel we're working on - * @param prefetchSize specifies the size of the prefetch window in octets * @param prefetchCount specifies a prefetch window in terms of whole messages * @param global apply QoS settings to entire connection * @default false */ - BasicQosFrame(uint16_t channel, int32_t prefetchSize = 0, int16_t prefetchCount = 0, bool global = false) : + BasicQosFrame(uint16_t channel, int16_t prefetchCount = 0, bool global = false) : BasicFrame(channel, 7), // 4 (int32) + 2 (int16) + 1 (bool) - _prefetchSize(prefetchSize), + _prefetchSize(0), _prefetchCount(prefetchCount), _global(global) {} - + /** + * Constructor based on incoming frame + * @param frame + */ BasicQosFrame(ReceivedFrame &frame) : BasicFrame(frame), _prefetchSize(frame.nextInt32()), @@ -78,6 +76,11 @@ public: _global(frame) {} + /** + * Destructor + */ + virtual ~BasicQosFrame() {} + /** * Return the method ID * @return uint16_t diff --git a/src/basicqosokframe.cpp b/src/basicqosokframe.cpp new file mode 100644 index 0000000..3394cb9 --- /dev/null +++ b/src/basicqosokframe.cpp @@ -0,0 +1,40 @@ +/** + * BasicQosOkFrame.cpp + * + * @copyright 2014 Copernica BV + */ +#include "includes.h" +#include "basicqosokframe.h" + +/** + * Set up namespace + */ +namespace AMQP { + +/** + * Process the frame + * @param connection The connection over which it was received + * @return bool Was it succesfully processed? + */ +bool BasicQosOKFrame::process(ConnectionImpl *connection) +{ + // we need the appropriate channel + ChannelImpl *channel = connection->channel(this->channel()); + + // channel does not exist + if (!channel) return false; + + // report + channel->reportQosSet(); + + // done + return true; + + +} + +/** + * End of namespace + */ +} + diff --git a/src/basicqosokframe.h b/src/basicqosokframe.h index 0481cbb..be1dd7c 100644 --- a/src/basicqosokframe.h +++ b/src/basicqosokframe.h @@ -52,6 +52,14 @@ public: { return 11; } + + /** + * Process the frame + * @param connection The connection over which it was received + * @return bool Was it succesfully processed? + */ + virtual bool process(ConnectionImpl *connection); + }; /** diff --git a/src/channelflowokframe.cpp b/src/channelflowokframe.cpp index ef6d722..c933d09 100644 --- a/src/channelflowokframe.cpp +++ b/src/channelflowokframe.cpp @@ -22,7 +22,7 @@ bool ChannelFlowOKFrame::process(ConnectionImpl *connection) ChannelImpl *channel = connection->channel(this->channel()); // channel does not exist - if(!channel) return false; + if (!channel) return false; // is the flow active? if (active()) channel->reportResumed(); diff --git a/src/channelimpl.cpp b/src/channelimpl.cpp index 62865e2..c447e31 100644 --- a/src/channelimpl.cpp +++ b/src/channelimpl.cpp @@ -24,6 +24,7 @@ #include "basicpublishframe.h" #include "basicheaderframe.h" #include "bodyframe.h" +#include "basicqosframe.h" #include @@ -421,6 +422,20 @@ bool ChannelImpl::publish(const std::string &exchange, const std::string &routin return true; } +/** + * Set the Quality of Service (QOS) for this channel + * @param prefetchCount maximum number of messages to prefetch + * @return bool whether the Qos frame is sent. + */ +bool ChannelImpl::setQos(uint16_t prefetchCount) +{ + // set for the entire connection + send(BasicQosFrame(_id, prefetchCount, false)); + + // done + return true; +} + /** * Send a frame over the channel * @param frame frame to send diff --git a/src/connectionimpl.cpp b/src/connectionimpl.cpp index 0e56fa2..8dbb101 100644 --- a/src/connectionimpl.cpp +++ b/src/connectionimpl.cpp @@ -242,17 +242,7 @@ size_t ConnectionImpl::send(const Frame &frame) } /** - * Set the Quality of Service (QOS) of the entire connection - * @param prefetchSize maximum size (in octets) of messages to be prefetched - * @param prefetchCount maximum number of messages to prefetch - * @return bool whether the Qos frame is sent. + * End of namspace */ -//bool Connection::setQOS(uint32_t prefetchSize = 0, uint16_t prefetchCount = 0) -//{ -// BasicQosFrame *frame = new BasicQosFrame(0, prefetchSize, prefetchCount, true); -// if(_connection->send(frame->buffer(), frame->totalSize()) != frame->totalSize()) return false; -// -// return true; -//} +} -} \ No newline at end of file diff --git a/tests/main.cpp b/tests/main.cpp index 5b4025e..3388dc9 100644 --- a/tests/main.cpp +++ b/tests/main.cpp @@ -31,13 +31,25 @@ using namespace Copernica; */ int main(int argc, const char *argv[]) { - // create connection - MyConnection connection; + // need an ip + if (argc != 2) + { + // report error + std::cerr << "usage: " << argv[0] << " " << std::endl; + + // done + return -1; + } + else + { + // create connection + MyConnection connection(argv[1]); - // start the main event loop - Event::MainLoop::instance()->run(); + // start the main event loop + Event::MainLoop::instance()->run(); - // done - return 0; + // done + return 0; + } } diff --git a/tests/myconnection.cpp b/tests/myconnection.cpp index 17d1c2f..76b6baf 100644 --- a/tests/myconnection.cpp +++ b/tests/myconnection.cpp @@ -26,13 +26,16 @@ using namespace Copernica; /** * Constructor */ -MyConnection::MyConnection() : +MyConnection::MyConnection(const std::string &ip) : _socket(Event::MainLoop::instance(), this), _connection(nullptr), _channel(nullptr) { // start connecting - _socket.connect(Network::Ipv4Address("127.0.0.1"), 5672); + if (_socket.connect(Network::Ipv4Address(ip), 5672)) return; + + // failure + onFailure(&_socket); } /** @@ -352,6 +355,9 @@ void MyConnection::onQueueBound(AMQP::Channel *channel) // show std::cout << "AMQP Queue bound" << std::endl; + _connection->setQos(10); +// _channel->setQos(10); + _channel->publish("my_exchange", "key", "this is the message"); } @@ -385,6 +391,15 @@ void MyConnection::onQueuePurged(AMQP::Channel *channel, uint32_t messageCount) { // show std::cout << "AMQP Queue purged" << std::endl; - channel->removeQueue("testqueue"); +} + +/** + * Method that is called when the quality-of-service was changed + * This is the result of a call to Channel::setQos() + */ +void MyConnection::onQosSet(AMQP::Channel *channel) +{ + // show + std::cout << "AMQP Qos set" << std::endl; } diff --git a/tests/myconnection.h b/tests/myconnection.h index d5662d6..fcb0ef2 100644 --- a/tests/myconnection.h +++ b/tests/myconnection.h @@ -212,13 +212,19 @@ private: */ virtual void onQueuePurged(AMQP::Channel *channel, uint32_t messageCount); + /** + * Method that is called when the quality-of-service was changed + * This is the result of a call to Channel::setQos() + */ + virtual void onQosSet(AMQP::Channel *channel); public: /** * Constructor + * @param ip */ - MyConnection(); + MyConnection(const std::string &ip); /** * Destructor