diff --git a/include/channel.h b/include/channel.h index 8f4ac2e..8372223 100644 --- a/include/channel.h +++ b/include/channel.h @@ -272,6 +272,16 @@ public: bool publish(const std::string &exchange, const std::string &routingKey, int flags, const char *message, size_t size) { return _implementation.publish(exchange, routingKey, flags, Envelope(message, size)); } bool publish(const std::string &exchange, const std::string &routingKey, const char *message, size_t size) { return _implementation.publish(exchange, routingKey, 0, Envelope(message, size)); } + /** + * Set the Quality of Service (QOS) for this channel + * @param prefetchCount maximum number of messages to prefetch + * @return bool whether the Qos frame is sent. + */ + bool setQos(uint16_t prefetchCount) + { + return _implementation.setQos(prefetchCount); + } + /** * Close the current channel * @return bool diff --git a/include/channelhandler.h b/include/channelhandler.h index 08d9dde..6fbbdf1 100644 --- a/include/channelhandler.h +++ b/include/channelhandler.h @@ -144,6 +144,12 @@ public: */ virtual void onQueuePurged(Channel *channel, uint32_t messageCount) {} + /** + * Method that is called when the quality-of-service was changed + * This is the result of a call to Channel::setQos() + */ + virtual void onQosSet(Channel *channel) {} + }; /** diff --git a/include/channelimpl.h b/include/channelimpl.h index a3198be..13deb6f 100644 --- a/include/channelimpl.h +++ b/include/channelimpl.h @@ -229,6 +229,13 @@ public: */ bool publish(const std::string &exchange, const std::string &routingKey, int flags, const Envelope &envelope); + /** + * Set the Quality of Service (QOS) of the entire connection + * @param prefetchCount maximum number of messages to prefetch + * @return bool whether the Qos frame is sent. + */ + bool setQos(uint16_t prefetchCount); + /** * Close the current channel * @return bool @@ -380,6 +387,14 @@ public: if (_handler) _handler->onQueuePurged(_parent, messageCount); } + /** + * Report that the qos has been set + */ + void reportQosSet() + { + if (_handler) _handler->onQosSet(_parent); + } + /** * The channel class is its friend, thus can it instantiate this object */ diff --git a/include/connection.h b/include/connection.h index 9a7af4f..64298a8 100644 --- a/include/connection.h +++ b/include/connection.h @@ -89,17 +89,6 @@ public: return _implementation.close(); } - /** - * Set the Quality of Service (QOS) of the entire connection - * @param prefetchSize maximum size (in octets) of messages to be prefetched - * @param prefetchCount maximum number of messages to prefetch - * @return bool whether the Qos frame is sent. - */ - bool setQos(uint32_t prefetchSize, uint16_t prefetchCount) - { - return _implementation.setQos(prefetchSize, prefetchCount); - } - /** * Some classes have access to private properties */ diff --git a/include/connectionimpl.h b/include/connectionimpl.h index 938bdca..0d6e9a9 100644 --- a/include/connectionimpl.h +++ b/include/connectionimpl.h @@ -275,14 +275,6 @@ public: // @Todo: notify all channels of closed connection } - /** - * Set the Quality of Service (QOS) of the entire connection - * @param prefetchSize maximum size (in octets) of messages to be prefetched - * @param prefetchCount maximum number of messages to prefetch - * @return bool whether the Qos frame is sent. - */ - bool setQos(uint32_t prefetchSize, uint16_t prefetchCount); - /** * The actual connection is a friend and can construct this class */ diff --git a/src/basicqosframe.h b/src/basicqosframe.h index f89aebd..6bf3049 100644 --- a/src/basicqosframe.h +++ b/src/basicqosframe.h @@ -15,28 +15,28 @@ namespace AMQP { class BasicQosFrame : public BasicFrame { private: - /** - * specifies the size of the prefetch window in octets - * @var int32_t - */ - int32_t _prefetchSize; + /** + * specifies the size of the prefetch window in octets + * @var int32_t + */ + int32_t _prefetchSize; - /** - * specifies a prefetch window in terms of whole messages - * @var int16_t - */ - int16_t _prefetchCount; + /** + * specifies a prefetch window in terms of whole messages + * @var int16_t + */ + int16_t _prefetchCount; + + /** + * apply QoS settings to entire connection + * @var BooleanSet + */ + BooleanSet _global; - /** - * apply QoS settings to entire connection - * @var BooleanSet - */ - BooleanSet _global; protected: /** - * Encode a frame on a string buffer - * - * @param buffer buffer to write frame to + * Encode a frame on a string buffer + * @param buffer buffer to write frame to */ virtual void fill(OutBuffer& buffer) const override { @@ -50,27 +50,25 @@ protected: } public: - /** - * Destructor - */ - virtual ~BasicQosFrame() {} /** * Construct a basic qos frame * * @param channel channel we're working on - * @param prefetchSize specifies the size of the prefetch window in octets * @param prefetchCount specifies a prefetch window in terms of whole messages * @param global apply QoS settings to entire connection * @default false */ - BasicQosFrame(uint16_t channel, int32_t prefetchSize = 0, int16_t prefetchCount = 0, bool global = false) : + BasicQosFrame(uint16_t channel, int16_t prefetchCount = 0, bool global = false) : BasicFrame(channel, 7), // 4 (int32) + 2 (int16) + 1 (bool) - _prefetchSize(prefetchSize), + _prefetchSize(0), _prefetchCount(prefetchCount), _global(global) {} - + /** + * Constructor based on incoming frame + * @param frame + */ BasicQosFrame(ReceivedFrame &frame) : BasicFrame(frame), _prefetchSize(frame.nextInt32()), @@ -78,6 +76,11 @@ public: _global(frame) {} + /** + * Destructor + */ + virtual ~BasicQosFrame() {} + /** * Return the method ID * @return uint16_t diff --git a/src/basicqosokframe.cpp b/src/basicqosokframe.cpp new file mode 100644 index 0000000..3394cb9 --- /dev/null +++ b/src/basicqosokframe.cpp @@ -0,0 +1,40 @@ +/** + * BasicQosOkFrame.cpp + * + * @copyright 2014 Copernica BV + */ +#include "includes.h" +#include "basicqosokframe.h" + +/** + * Set up namespace + */ +namespace AMQP { + +/** + * Process the frame + * @param connection The connection over which it was received + * @return bool Was it succesfully processed? + */ +bool BasicQosOKFrame::process(ConnectionImpl *connection) +{ + // we need the appropriate channel + ChannelImpl *channel = connection->channel(this->channel()); + + // channel does not exist + if (!channel) return false; + + // report + channel->reportQosSet(); + + // done + return true; + + +} + +/** + * End of namespace + */ +} + diff --git a/src/basicqosokframe.h b/src/basicqosokframe.h index 0481cbb..be1dd7c 100644 --- a/src/basicqosokframe.h +++ b/src/basicqosokframe.h @@ -52,6 +52,14 @@ public: { return 11; } + + /** + * Process the frame + * @param connection The connection over which it was received + * @return bool Was it succesfully processed? + */ + virtual bool process(ConnectionImpl *connection); + }; /** diff --git a/src/channelflowokframe.cpp b/src/channelflowokframe.cpp index ef6d722..c933d09 100644 --- a/src/channelflowokframe.cpp +++ b/src/channelflowokframe.cpp @@ -22,7 +22,7 @@ bool ChannelFlowOKFrame::process(ConnectionImpl *connection) ChannelImpl *channel = connection->channel(this->channel()); // channel does not exist - if(!channel) return false; + if (!channel) return false; // is the flow active? if (active()) channel->reportResumed(); diff --git a/src/channelimpl.cpp b/src/channelimpl.cpp index 62865e2..c447e31 100644 --- a/src/channelimpl.cpp +++ b/src/channelimpl.cpp @@ -24,6 +24,7 @@ #include "basicpublishframe.h" #include "basicheaderframe.h" #include "bodyframe.h" +#include "basicqosframe.h" #include @@ -421,6 +422,20 @@ bool ChannelImpl::publish(const std::string &exchange, const std::string &routin return true; } +/** + * Set the Quality of Service (QOS) for this channel + * @param prefetchCount maximum number of messages to prefetch + * @return bool whether the Qos frame is sent. + */ +bool ChannelImpl::setQos(uint16_t prefetchCount) +{ + // set for the entire connection + send(BasicQosFrame(_id, prefetchCount, false)); + + // done + return true; +} + /** * Send a frame over the channel * @param frame frame to send diff --git a/src/connectionimpl.cpp b/src/connectionimpl.cpp index 0e56fa2..8dbb101 100644 --- a/src/connectionimpl.cpp +++ b/src/connectionimpl.cpp @@ -242,17 +242,7 @@ size_t ConnectionImpl::send(const Frame &frame) } /** - * Set the Quality of Service (QOS) of the entire connection - * @param prefetchSize maximum size (in octets) of messages to be prefetched - * @param prefetchCount maximum number of messages to prefetch - * @return bool whether the Qos frame is sent. + * End of namspace */ -//bool Connection::setQOS(uint32_t prefetchSize = 0, uint16_t prefetchCount = 0) -//{ -// BasicQosFrame *frame = new BasicQosFrame(0, prefetchSize, prefetchCount, true); -// if(_connection->send(frame->buffer(), frame->totalSize()) != frame->totalSize()) return false; -// -// return true; -//} +} -} \ No newline at end of file diff --git a/tests/main.cpp b/tests/main.cpp index 5b4025e..3388dc9 100644 --- a/tests/main.cpp +++ b/tests/main.cpp @@ -31,13 +31,25 @@ using namespace Copernica; */ int main(int argc, const char *argv[]) { - // create connection - MyConnection connection; + // need an ip + if (argc != 2) + { + // report error + std::cerr << "usage: " << argv[0] << " " << std::endl; + + // done + return -1; + } + else + { + // create connection + MyConnection connection(argv[1]); - // start the main event loop - Event::MainLoop::instance()->run(); + // start the main event loop + Event::MainLoop::instance()->run(); - // done - return 0; + // done + return 0; + } } diff --git a/tests/myconnection.cpp b/tests/myconnection.cpp index 17d1c2f..76b6baf 100644 --- a/tests/myconnection.cpp +++ b/tests/myconnection.cpp @@ -26,13 +26,16 @@ using namespace Copernica; /** * Constructor */ -MyConnection::MyConnection() : +MyConnection::MyConnection(const std::string &ip) : _socket(Event::MainLoop::instance(), this), _connection(nullptr), _channel(nullptr) { // start connecting - _socket.connect(Network::Ipv4Address("127.0.0.1"), 5672); + if (_socket.connect(Network::Ipv4Address(ip), 5672)) return; + + // failure + onFailure(&_socket); } /** @@ -352,6 +355,9 @@ void MyConnection::onQueueBound(AMQP::Channel *channel) // show std::cout << "AMQP Queue bound" << std::endl; + _connection->setQos(10); +// _channel->setQos(10); + _channel->publish("my_exchange", "key", "this is the message"); } @@ -385,6 +391,15 @@ void MyConnection::onQueuePurged(AMQP::Channel *channel, uint32_t messageCount) { // show std::cout << "AMQP Queue purged" << std::endl; - channel->removeQueue("testqueue"); +} + +/** + * Method that is called when the quality-of-service was changed + * This is the result of a call to Channel::setQos() + */ +void MyConnection::onQosSet(AMQP::Channel *channel) +{ + // show + std::cout << "AMQP Qos set" << std::endl; } diff --git a/tests/myconnection.h b/tests/myconnection.h index d5662d6..fcb0ef2 100644 --- a/tests/myconnection.h +++ b/tests/myconnection.h @@ -212,13 +212,19 @@ private: */ virtual void onQueuePurged(AMQP::Channel *channel, uint32_t messageCount); + /** + * Method that is called when the quality-of-service was changed + * This is the result of a call to Channel::setQos() + */ + virtual void onQosSet(AMQP::Channel *channel); public: /** * Constructor + * @param ip */ - MyConnection(); + MyConnection(const std::string &ip); /** * Destructor