diff --git a/Makefile b/Makefile index f341d6c..c09ddd9 100644 --- a/Makefile +++ b/Makefile @@ -5,6 +5,12 @@ LIBRARY_DIR = ${PREFIX}/lib all: $(MAKE) -C src all +static: + $(MAKE) -C src static + +shared: + $(MAKE) -C src shared + clean: $(MAKE) -C src clean diff --git a/include/channel.h b/include/channel.h index 8372223..1aecbda 100644 --- a/include/channel.h +++ b/include/channel.h @@ -274,6 +274,13 @@ public: /** * Set the Quality of Service (QOS) for this channel + * + * When you consume messages, every single messages needs to be ack'ed to inform + * the RabbitMQ server that is has been received. The Qos setting specifies the + * number of unacked messages that may exist in the client application. The server + * stops delivering more messages if the number of unack'ed messages has reached + * the prefetchCount + * * @param prefetchCount maximum number of messages to prefetch * @return bool whether the Qos frame is sent. */ @@ -282,6 +289,58 @@ public: return _implementation.setQos(prefetchCount); } + /** + * Tell the RabbitMQ server that we're ready to consume messages + * + * After this method is called, RabbitMQ starts delivering messages to the client + * application. The consume tag is a string identifier that will be passed to + * each received message, so that you can associate incoming messages with a + * consumer. If you do not specify a consumer tag, the server will assign one + * for you. + * + * The following flags are supported: + * + * - nolocal if set, messages published on this channel are not also consumed + * - noack if set, consumed messages do not have to be acked, this happens automatically + * - exclusive request exclusive access, only this consumer can access the queue + * - nowait the server does not have to send a response back that consuming is active + * + * The method ChannelHandler::onConsumerStarted() will be called when the + * consumer has started (unless the nowait option was set, in which case + * no confirmation method is called) + * + * @param queue the queue from which you want to consume + * @param tag a consumer tag that will be associated with this consume operation + * @param flags additional flags + * @param arguments additional arguments + * @return bool + */ + bool consume(const std::string &queue, const std::string &tag, int flags, const Table &arguments) { return _implementation.consume(queue, tag, flags, arguments); } + bool consume(const std::string &queue, const std::string &tag, int flags = 0) { return _implementation.consume(queue, tag, flags, Table()); } + bool consume(const std::string &queue, const std::string &tag, const Table &arguments) { return _implementation.consume(queue, tag, 0, arguments); } + bool consume(const std::string &queue, int flags, const Table &arguments) { return _implementation.consume(queue, std::string(), flags, arguments); } + bool consume(const std::string &queue, int flags = 0) { return _implementation.consume(queue, std::string(), flags, Table()); } + bool consume(const std::string &queue, const Table &arguments) { return _implementation.consume(queue, std::string(), 0, arguments); } + + /** + * Cancel a running consume call + * + * If you want to stop a running consumer, you can use this method with the consumer tag + * + * The following flags are supported: + * + * - nowait the server does not have to send a response back that the consumer has been cancelled + * + * The method ChannelHandler::onConsumerStopped() will be called when the consumer + * was succesfully stopped (unless the nowait option was used, in which case no + * confirmation method is called) + * + * @param tag the consumer tag + * @param flags optional additional flags + * @return bool + */ + bool cancel(const std::string &tag, int flags = 0) { return _implementation.cancel(tag, flags); } + /** * Close the current channel * @return bool diff --git a/include/channelhandler.h b/include/channelhandler.h index 6fbbdf1..97c4921 100644 --- a/include/channelhandler.h +++ b/include/channelhandler.h @@ -23,91 +23,92 @@ class ChannelHandler public: /** * Method that is called when the channel was succesfully created. - * @param channel + * @param channel the channel that is ready */ virtual void onReady(Channel *channel) {} /** * An error has occured on the channel - * @param channel - * @param message + * The channel is no longer usable after an error has occured on it + * @param channel the channel on which the error occured + * @param message human readable error message */ virtual void onError(Channel *channel, const std::string &message) {} /** * Method that is called when the channel was paused * This is the result of a call to Channel::pause() - * @param channel + * @param channel the channel that is now paused */ virtual void onPaused(Channel *channel) {} /** * Method that is called when the channel was resumed * This is the result of a call to Channel::resume() - * @param channel + * @param channel the channel that is no longer paused */ virtual void onResumed(Channel *channel) {} /** * Method that is called when a channel is closed * This is the result of a call to Channel::close() - * @param channel + * @param channel the channel that is closed */ virtual void onClosed(Channel *channel) {} /** * Method that is called when a transaction was started * This is the result of a call to Channel::startTransaction() - * @param channel + * @param channel the channel on which the transaction was started */ virtual void onTransactionStarted(Channel *channel) {} /** * Method that is called when a transaction was committed * This is the result of a call to Channel::commitTransaction() - * @param channel + * @param channel the channel on which the transaction was committed */ virtual void onTransactionCommitted(Channel *channel) {} /** * Method that is called when a transaction was rolled back * This is the result of a call to Channel::rollbackTransaction() - * @param channel + * @param channel the channel on which the transaction was rolled back */ virtual void onTransactionRolledBack(Channel *channel) {} /** * Method that is called when an exchange is bound * This is the result of a call to Channel::bindExchange() - * @param channel + * @param channel the channel on which the exchange was bound */ virtual void onExchangeBound(Channel *channel) {} /** * Method that is called when an exchange is unbound * This is the result of a call to Channel::unbindExchange() - * @param channel + * @param channel the channel on which the exchange was unbound */ virtual void onExchangeUnbound(Channel *channel) {} /** * Method that is called when an exchange is deleted * This is the result of a call to Channel::deleteExchange() - * @param channel + * @param channel the channel on which the exchange was deleted */ virtual void onExchangeDeleted(Channel *channel) {} /** * Mehod that is called when an exchange is declared * This is the result of a call to Channel::declareExchange() - * @param channel + * @param channel the channel on which the exchange was declared */ virtual void onExchangeDeclared(Channel *channel) {} /** * Method that is called when a queue is declared * This is the result of a call to Channel::declareQueue() - * @param channel + * @param channel the channel on which the queue was declared * @param name name of the queue * @param messageCount number of messages in queue * @param consumerCount number of active consumers @@ -117,15 +118,14 @@ public: /** * Method that is called when a queue is bound * This is the result of a call to Channel::bindQueue() - * @param channel - * @param + * @param channel the channel on which the queue was bound */ virtual void onQueueBound(Channel *channel) {} /** * Method that is called when a queue is deleted * This is the result of a call to Channel::deleteQueue() - * @param channel + * @param channel the channel on which the queue was deleted * @param messageCount number of messages deleted along with the queue */ virtual void onQueueDeleted(Channel *channel, uint32_t messageCount) {} @@ -133,22 +133,47 @@ public: /** * Method that is called when a queue is unbound * This is the result of a call to Channel::unbindQueue() - * @param channel + * @param channel the channel on which the queue was unbound */ virtual void onQueueUnbound(Channel *channel) {} /** * Method that is called when a queue is purged * This is the result of a call to Channel::purgeQueue() - * @param messageCount number of message purged + * @param channel the channel on which the queue was emptied + * @param messageCount number of message purged */ virtual void onQueuePurged(Channel *channel, uint32_t messageCount) {} /** * Method that is called when the quality-of-service was changed * This is the result of a call to Channel::setQos() + * @param channel the channel on which the qos was set */ virtual void onQosSet(Channel *channel) {} + + /** + * Method that is called when a consumer was started + * This is the result of a call to Channel::consume() + * @param channel the channel on which the consumer was started + * @param tag the consumer tag + */ + virtual void onConsumerStarted(Channel *channel, const std::string &tag) {} + + /** + * Method that is called when a message has been consumed + * @param channel the channel on which the consumer was started + * @param message the consumed message + */ + virtual void onConsumed(Channel *channel, const Message &message) {} + + /** + * Method that is called when a consumer was stopped + * This is the result of a call to Channel::cancel() + * @param channel the channel on which the consumer was stopped + * @param tag the consumer tag + */ + virtual void onConsumerStopped(Channel *channel, const std::string &tag) {} }; diff --git a/include/channelimpl.h b/include/channelimpl.h index 13deb6f..dd1668d 100644 --- a/include/channelimpl.h +++ b/include/channelimpl.h @@ -57,6 +57,12 @@ private: * @var bool */ bool _transaction = false; + + /** + * The message that is now being received + * @var MessageImpl + */ + MessageImpl *_message = nullptr; /** * Construct a channel object @@ -236,6 +242,23 @@ public: */ bool setQos(uint16_t prefetchCount); + /** + * Tell the RabbitMQ server that we're ready to consume messages + * @param queue the queue from which you want to consume + * @param tag a consumer tag that will be associated with this consume operation + * @param flags additional flags + * @param arguments additional arguments + * @return bool + */ + bool consume(const std::string &queue, const std::string &tag, int flags, const Table &arguments); + + /** + * Cancel a running consumer + * @param tag the consumer tag + * @param flags optional flags + */ + bool cancel(const std::string &tag, int flags); + /** * Close the current channel * @return bool @@ -395,6 +418,45 @@ public: if (_handler) _handler->onQosSet(_parent); } + /** + * Report that a consumer has started + * @param tag the consumer tag + */ + void reportConsumerStarted(const std::string &tag) + { + if (_handler) _handler->onConsumerStarted(_parent, tag); + } + + /** + * Report that a consumer has stopped + * @param tag the consumer tag + */ + void reportConsumerStopped(const std::string &tag) + { + if (_handler) _handler->onConsumerStopped(_parent, tag); + } + + /** + * Report the consumed message + */ + void reportDelivery(); + + /** + * Create an incoming message + * @param frame + * @return MessageImpl + */ + MessageImpl *message(const BasicDeliverFrame &frame); + + /** + * Retrieve the current incoming message + * @return MessageImpl + */ + MessageImpl *message() + { + return _message; + } + /** * The channel class is its friend, thus can it instantiate this object */ diff --git a/include/classes.h b/include/classes.h index e480821..57ef08d 100644 --- a/include/classes.h +++ b/include/classes.h @@ -14,6 +14,9 @@ namespace AMQP { * All classes defined by this library */ class Array; +class BasicDeliverFrame; +class BasicHeaderFrame; +class BodyFrame; class Channel; class Connection; class ConnectionHandler; @@ -21,6 +24,7 @@ class ConnectionImpl; class Exchange; class Frame; class Login; +class MessageImpl; class Monitor; class OutBuffer; class ReceivedFrame; diff --git a/include/envelope.h b/include/envelope.h index eaa33ae..c793eab 100644 --- a/include/envelope.h +++ b/include/envelope.h @@ -17,7 +17,7 @@ namespace AMQP { */ class Envelope : public MetaData { -private: +protected: /** * Pointer to the body data (the memory buffer is not managed by the AMQP * library!) @@ -29,7 +29,7 @@ private: * Size of the data * @var uint64_t */ - uint64_t _bodysize; + uint64_t _bodySize; public: /** @@ -41,13 +41,13 @@ public: * @param body * @param size */ - Envelope(const char *body, uint64_t size) : MetaData(), _body(body), _bodysize(size) {} + Envelope(const char *body, uint64_t size) : MetaData(), _body(body), _bodySize(size) {} /** * Constructor based on a string * @param body */ - Envelope(const std::string &body) : MetaData(), _body(body.data()), _bodysize(body.size()) {} + Envelope(const std::string &body) : MetaData(), _body(body.data()), _bodySize(body.size()) {} /** * Destructor @@ -69,7 +69,16 @@ public: */ uint64_t bodySize() const { - return _bodysize; + return _bodySize; + } + + /** + * Body as a string + * @return string + */ + std::string message() const + { + return std::string(_body, _bodySize); } }; diff --git a/include/message.h b/include/message.h new file mode 100644 index 0000000..c55b42b --- /dev/null +++ b/include/message.h @@ -0,0 +1,124 @@ +/** + * Message.h + * + * An incoming message has the same sort of information as an outgoing + * message, plus some additional information. + * + * Message objects can not be constructed by end users, they are only constructed + * by the AMQP library, and passed to the ChannelHandler::onDelivered() method + * + * @copyright 2014 Copernica BV + */ + +/** + * Set up namespace + */ +namespace AMQP { + +/** + * Class definition + */ +class Message : public Envelope +{ +protected: + /** + * The consumer tag over which it was delivered + * @var string + */ + std::string _consumerTag; + + /** + * Unique delivery tag to identify and ack the mesage + * @var uint64_t + */ + uint64_t _deliveryTag; + + /** + * Is this a redelivered message / has it been delivered before? + * @var bool + */ + bool _redelivered; + + /** + * The exchange to which it was originally published + * @var string + */ + std::string _exchange; + + /** + * The routing key that was originally used + * @var string + */ + std::string _routingKey; + +protected: + /** + * The constructor is protected to ensure that endusers can not + * instantiate a message + * @param consumerTag + * @param deliveryTag + * @param redelivered + * @param exchange + * @param routingKey + */ + Message(const std::string &consumerTag, uint64_t deliveryTag, bool redelivered, const std::string &exchange, const std::string &routingKey) : + Envelope(nullptr, 0), _consumerTag(consumerTag), _deliveryTag(deliveryTag), _redelivered(redelivered), _exchange(exchange), _routingKey(routingKey) + {} + +public: + /** + * Destructor + */ + virtual ~Message() {} + + /** + * The consumer tag over which it was delivered + * @return string + */ + std::string &consumerTag() + { + return _consumerTag; + } + + /** + * Unique delivery tag to identify and ack the mesage + * @return uint64_t + */ + uint64_t deliveryTag() + { + return _deliveryTag; + } + + /** + * Is this a redelivered message / has it been delivered before? + * @var bool + */ + bool redelivered() + { + return _redelivered; + } + + /** + * The exchange to which it was originally published + * @var string + */ + std::string &exchange() + { + return _exchange; + } + + /** + * The routing key that was originally used + * @var string + */ + std::string &routingKey() + { + return _routingKey; + } +}; + +/** + * End of namespace + */ +} + diff --git a/include/metadata.h b/include/metadata.h index a88cc47..7c1714a 100644 --- a/include/metadata.h +++ b/include/metadata.h @@ -153,6 +153,31 @@ public: */ virtual ~MetaData() {} + /** + * Set all meta data + * @param data + */ + void set(const MetaData &data) + { + // simply copy all fields + _bools1 = data._bools1; + _bools2 = data._bools2; + _contentType = data._contentType; + _contentEncoding = data._contentEncoding; + _headers = data._headers; + _deliveryMode = data._deliveryMode; + _priority = data._priority; + _correlationID = data._correlationID; + _replyTo = data._replyTo; + _expiration = data._expiration; + _messageID = data._messageID; + _timestamp = data._timestamp; + _typeName = data._typeName; + _userID = data._userID; + _appID = data._appID; + _clusterID = data._clusterID; + } + /** * Check if a certain field is set * @return bool diff --git a/libamqp.h b/libamqp.h index 7726b42..83dc696 100644 --- a/libamqp.h +++ b/libamqp.h @@ -44,6 +44,7 @@ // envelope for publishing and consuming #include #include +#include // mid level includes #include diff --git a/src/Makefile b/src/Makefile index d3e4050..b7bfee2 100644 --- a/src/Makefile +++ b/src/Makefile @@ -3,27 +3,30 @@ RM = rm -f CPPFLAGS = -Wall -c -I. -O2 -flto -std=c++11 -g LD = g++ LD_FLAGS = -Wall -shared -O2 -RESULT = liblibamqp.so - -STATIC = $(RESULT:%.so=%.a) +SHARED_LIB = liblibamqp.so +STATIC_LIB = $(SHARED_LIB:%.so=%.a) SOURCES = $(wildcard *.cpp) -OBJECTS = $(SOURCES:%.cpp=%.o) -OBJECTS_STATIC = $(SOURCES:%.cpp=%.s.o) +SHARED_OBJECTS = $(SOURCES:%.cpp=%.o) +STATIC_OBJECTS = $(SOURCES:%.cpp=%.s.o) -all: ${OBJECTS} ${OBJECTS_STATIC} ${RESULT} - $(MAKE) STATIC_LIB +all: shared static -${RESULT}: ${OBJECTS} - ${LD} ${LD_FLAGS} -o $@ ${OBJECTS} +shared: ${SHARED_OBJECTS} ${SHARED_LIB} + +static: ${STATIC_OBJECTS} ${STATIC_LIB} + +${SHARED_LIB}: ${SHARED_OBJECTS} + ${LD} ${LD_FLAGS} -o $@ ${SHARED_OBJECTS} + +${STATIC_LIB}: ${STATIC_OBJECTS} + ar rcs ${STATIC_LIB} ${STATIC_OBJECTS} clean: - ${RM} *.obj *~* ${OBJECTS} ${OBJECTS_STATIC} ${RESULT} ${STATIC} + ${RM} *.obj *~* ${SHARED_OBJECTS} ${STATIC_OBJECTS} ${SHARED_LIB} ${STATIC_LIB} -${OBJECTS}: +${SHARED_OBJECTS}: ${CPP} ${CPPFLAGS} -fpic -o $@ ${@:%.o=%.cpp} -${OBJECTS_STATIC}: +${STATIC_OBJECTS}: ${CPP} ${CPPFLAGS} -o $@ ${@:%.s.o=%.cpp} -STATIC_LIB: - ar rcs ${STATIC} ${OBJECTS_STATIC} diff --git a/src/basiccancelokframe.h b/src/basiccancelokframe.h index b5f9c58..a4edff6 100644 --- a/src/basiccancelokframe.h +++ b/src/basiccancelokframe.h @@ -81,6 +81,26 @@ public: { return 31; } + + /** + * Process the frame + * @param connection The connection over which it was received + * @return bool Was it succesfully processed? + */ + virtual bool process(ConnectionImpl *connection) + { + // we need the appropriate channel + ChannelImpl *channel = connection->channel(this->channel()); + + // channel does not exist + if (!channel) return false; + + // report + channel->reportConsumerStopped(consumerTag()); + + // done + return true; + } }; /** diff --git a/src/basicconsumeokframe.h b/src/basicconsumeokframe.h index 2f10a39..3a26e87 100644 --- a/src/basicconsumeokframe.h +++ b/src/basicconsumeokframe.h @@ -79,6 +79,26 @@ public: { return _consumerTag; } + + /** + * Process the frame + * @param connection The connection over which it was received + * @return bool Was it succesfully processed? + */ + virtual bool process(ConnectionImpl *connection) + { + // we need the appropriate channel + ChannelImpl *channel = connection->channel(this->channel()); + + // channel does not exist + if (!channel) return false; + + // report + channel->reportConsumerStarted(consumerTag()); + + // done + return true; + } }; /** diff --git a/src/basicdeliverframe.h b/src/basicdeliverframe.h index 2800f4d..aef677f 100644 --- a/src/basicdeliverframe.h +++ b/src/basicdeliverframe.h @@ -155,6 +155,26 @@ public: { return _redelivered.get(0); } + + /** + * Process the frame + * @param connection The connection over which it was received + * @return bool Was it succesfully processed? + */ + virtual bool process(ConnectionImpl *connection) override + { + // we need the appropriate channel + ChannelImpl *channel = connection->channel(this->channel()); + + // channel does not exist + if (!channel) return false; + + // construct the message + channel->message(*this); + + // done + return true; + } }; /** diff --git a/src/basicheaderframe.h b/src/basicheaderframe.h index f57cdd5..942f39d 100644 --- a/src/basicheaderframe.h +++ b/src/basicheaderframe.h @@ -101,6 +101,33 @@ public: { return 60; } + + /** + * Process the frame + * @param connection The connection over which it was received + * @return bool Was it succesfully processed? + */ + virtual bool process(ConnectionImpl *connection) + { + // we need the appropriate channel + ChannelImpl *channel = connection->channel(this->channel()); + + // channel does not exist + if (!channel) return false; + + // is there a current message? + MessageImpl *message = channel->message(); + if (!message) return false; + + // store size + message->setBodySize(bodySize()); + + // and copy the meta data + message->set(_metadata); + + // done + return true; + } }; /** diff --git a/src/basicqosokframe.cpp b/src/basicqosokframe.cpp deleted file mode 100644 index 3394cb9..0000000 --- a/src/basicqosokframe.cpp +++ /dev/null @@ -1,40 +0,0 @@ -/** - * BasicQosOkFrame.cpp - * - * @copyright 2014 Copernica BV - */ -#include "includes.h" -#include "basicqosokframe.h" - -/** - * Set up namespace - */ -namespace AMQP { - -/** - * Process the frame - * @param connection The connection over which it was received - * @return bool Was it succesfully processed? - */ -bool BasicQosOKFrame::process(ConnectionImpl *connection) -{ - // we need the appropriate channel - ChannelImpl *channel = connection->channel(this->channel()); - - // channel does not exist - if (!channel) return false; - - // report - channel->reportQosSet(); - - // done - return true; - - -} - -/** - * End of namespace - */ -} - diff --git a/src/basicqosokframe.h b/src/basicqosokframe.h index be1dd7c..3aaac49 100644 --- a/src/basicqosokframe.h +++ b/src/basicqosokframe.h @@ -58,8 +58,20 @@ public: * @param connection The connection over which it was received * @return bool Was it succesfully processed? */ - virtual bool process(ConnectionImpl *connection); - + virtual bool process(ConnectionImpl *connection) + { + // we need the appropriate channel + ChannelImpl *channel = connection->channel(this->channel()); + + // channel does not exist + if (!channel) return false; + + // report + channel->reportQosSet(); + + // done + return true; + } }; /** diff --git a/src/bodyframe.h b/src/bodyframe.h index 811a278..f8318c0 100644 --- a/src/bodyframe.h +++ b/src/bodyframe.h @@ -91,6 +91,35 @@ public: { return _payload; } + + /** + * Process the frame + * @param connection The connection over which it was received + * @return bool Was it succesfully processed? + */ + virtual bool process(ConnectionImpl *connection) + { + // we need the appropriate channel + ChannelImpl *channel = connection->channel(this->channel()); + + // channel does not exist + if (!channel) return false; + + // is there a current message? + MessageImpl *message = channel->message(); + if (!message) return false; + + // store size + if (!message->append(_payload, _size)) return true; + + // the message is complete + channel->reportDelivery(); + + // done + return true; + } + + }; /** diff --git a/src/channelcloseframe.cpp b/src/channelcloseframe.cpp deleted file mode 100644 index 226bc15..0000000 --- a/src/channelcloseframe.cpp +++ /dev/null @@ -1,42 +0,0 @@ -/** - * ChannelCloseFrame.cpp - * - * @copyright 2014 Copernica BV - */ -#include "includes.h" -#include "channelcloseframe.h" -#include "channelcloseokframe.h" - -/** - * Set up namespace - */ -namespace AMQP { - -/** - * Process the frame - * @param connection The connection over which it was received - * @return bool Was it succesfully processed? - */ -bool ChannelCloseFrame::process(ConnectionImpl *connection) -{ - // check if we have a channel - ChannelImpl *channel = connection->channel(this->channel()); - - // send back an ok frame - connection->send(ChannelCloseOKFrame(this->channel())); - - // what if channel doesn't exist? - if (!channel) return false; - - // report to the handler - channel->reportError(text()); - - // done - return true; -} - -/** - * End of namespace - */ -} - diff --git a/src/channelcloseframe.h b/src/channelcloseframe.h index 59f8ca0..6db745f 100644 --- a/src/channelcloseframe.h +++ b/src/channelcloseframe.h @@ -144,8 +144,23 @@ public: * @param connection The connection over which it was received * @return bool Was it succesfully processed? */ - virtual bool process(ConnectionImpl *connection) override; - + virtual bool process(ConnectionImpl *connection) override + { + // check if we have a channel + ChannelImpl *channel = connection->channel(this->channel()); + + // send back an ok frame + connection->send(ChannelCloseOKFrame(this->channel())); + + // what if channel doesn't exist? + if (!channel) return false; + + // report to the handler + channel->reportError(text()); + + // done + return true; + } }; /** diff --git a/src/channelcloseokframe.cpp b/src/channelcloseokframe.cpp deleted file mode 100644 index 2e539d1..0000000 --- a/src/channelcloseokframe.cpp +++ /dev/null @@ -1,38 +0,0 @@ -/** - * ChannelCloseOkFrame.cpp - * - * @copyright 2014 Copernica BV - */ -#include "includes.h" -#include "channelcloseokframe.h" - -/** - * Set up namespace - */ -namespace AMQP { - -/** - * Process the frame - * @param connection The connection over which it was received - * @return bool Was it succesfully processed? - */ -bool ChannelCloseOKFrame::process(ConnectionImpl *connection) -{ - // we need the appropriate channel - ChannelImpl *channel = connection->channel(this->channel()); - - // channel does not exist - if(!channel) return false; - - // report that the channel is closed - channel->reportClosed(); - - // done - return true; -} - -/** - * End of namespace - */ -} - diff --git a/src/channelcloseokframe.h b/src/channelcloseokframe.h index 1702734..8a17089 100644 --- a/src/channelcloseokframe.h +++ b/src/channelcloseokframe.h @@ -62,7 +62,20 @@ public: * @param connection The connection over which it was received * @return bool Was it succesfully processed? */ - virtual bool process(ConnectionImpl *connection) override; + virtual bool process(ConnectionImpl *connection) override + { + // we need the appropriate channel + ChannelImpl *channel = connection->channel(this->channel()); + + // channel does not exist + if(!channel) return false; + + // report that the channel is closed + channel->reportClosed(); + + // done + return true; + } }; /** diff --git a/src/channelflowokframe.cpp b/src/channelflowokframe.cpp deleted file mode 100644 index c933d09..0000000 --- a/src/channelflowokframe.cpp +++ /dev/null @@ -1,39 +0,0 @@ -/** - * ChannelFlowOkFrame.cpp - * - * @copyright 2014 Copernica BV - */ -#include "includes.h" -#include "channelflowokframe.h" - -/** - * Set up namespace - */ -namespace AMQP { - -/** - * Process the frame - * @param connection The connection over which it was received - * @return bool Was it succesfully processed? - */ -bool ChannelFlowOKFrame::process(ConnectionImpl *connection) -{ - // we need the appropriate channel - ChannelImpl *channel = connection->channel(this->channel()); - - // channel does not exist - if (!channel) return false; - - // is the flow active? - if (active()) channel->reportResumed(); - else channel->reportPaused(); - - // done - return true; -} - -/** - * End of namespace - */ -} - diff --git a/src/channelflowokframe.h b/src/channelflowokframe.h index e633e84..efc3ae6 100644 --- a/src/channelflowokframe.h +++ b/src/channelflowokframe.h @@ -84,7 +84,21 @@ public: * @param connection The connection over which it was received * @return bool Was it succesfully processed? */ - virtual bool process(ConnectionImpl *connection) override; + virtual bool process(ConnectionImpl *connection) override + { + // we need the appropriate channel + ChannelImpl *channel = connection->channel(this->channel()); + + // channel does not exist + if (!channel) return false; + + // is the flow active? + if (active()) channel->reportResumed(); + else channel->reportPaused(); + + // done + return true; + } }; /** diff --git a/src/channelimpl.cpp b/src/channelimpl.cpp index c447e31..a3fdebb 100644 --- a/src/channelimpl.cpp +++ b/src/channelimpl.cpp @@ -6,8 +6,11 @@ * @copyright 2014 Copernica BV */ #include "includes.h" +#include "basicdeliverframe.h" +#include "messageimpl.h" #include "channelopenframe.h" #include "channelflowframe.h" +#include "channelcloseokframe.h" #include "channelcloseframe.h" #include "transactionselectframe.h" #include "transactioncommitframe.h" @@ -25,8 +28,8 @@ #include "basicheaderframe.h" #include "bodyframe.h" #include "basicqosframe.h" - -#include +#include "basicconsumeframe.h" +#include "basiccancelframe.h" /** * Set up namespace @@ -71,6 +74,10 @@ ChannelImpl::ChannelImpl(Channel *parent, Connection *connection, ChannelHandler */ ChannelImpl::~ChannelImpl() { + // remove incoming message + if (_message) delete _message; + _message = nullptr; + // remove this channel from the connection _connection->_implementation.remove(this); @@ -78,7 +85,10 @@ ChannelImpl::~ChannelImpl() if (!connected()) return; // close the channel now + // @todo is this ok? close(); + + // do we have } /** @@ -390,6 +400,8 @@ bool ChannelImpl::publish(const std::string &exchange, const std::string &routin { // @todo prevent crash when connection is destructed + // @todo do not copy the entire buffer to individual frames + // send the publish frame send(BasicPublishFrame(_id, exchange, routingKey, flags & mandatory, flags & immediate)); @@ -429,13 +441,44 @@ bool ChannelImpl::publish(const std::string &exchange, const std::string &routin */ bool ChannelImpl::setQos(uint16_t prefetchCount) { - // set for the entire connection + // send a qos frame send(BasicQosFrame(_id, prefetchCount, false)); // done return true; } +/** + * Tell the RabbitMQ server that we're ready to consume messages + * @param queue the queue from which you want to consume + * @param tag a consumer tag that will be associated with this consume operation + * @param flags additional flags + * @param arguments additional arguments + * @return bool + */ +bool ChannelImpl::consume(const std::string &queue, const std::string &tag, int flags, const Table &arguments) +{ + // send a consume frame + send(BasicConsumeFrame(_id, queue, tag, flags & nolocal, flags & noack, flags & exclusive, flags & nowait, arguments)); + + // done + return true; +} + +/** + * Cancel a running consumer + * @param tag the consumer tag + * @param flags optional flags + */ +bool ChannelImpl::cancel(const std::string &tag, int flags) +{ + // send a cancel frame + send(BasicCancelFrame(_id, tag, flags & nowait)); + + // done + return true; +} + /** * Send a frame over the channel * @param frame frame to send @@ -447,6 +490,28 @@ size_t ChannelImpl::send(const Frame &frame) return _connection->_implementation.send(frame); } +/** + * Report the consumed message + */ +void ChannelImpl::reportDelivery() +{ + if (_handler) _handler->onConsumed(_parent, *_message); +} + +/** + * Create an incoming message + * @param frame + * @return MessageImpl + */ +MessageImpl *ChannelImpl::message(const BasicDeliverFrame &frame) +{ + // it should not be possible that a message already exists, but lets check it anyhow + if (_message) delete _message; + + // construct a message + return _message = new MessageImpl(frame); +} + /** * End of namespace */ diff --git a/src/channelopenokframe.cpp b/src/channelopenokframe.cpp deleted file mode 100644 index 552a474..0000000 --- a/src/channelopenokframe.cpp +++ /dev/null @@ -1,38 +0,0 @@ -/** - * ChannelOpenOkFrame.cpp - * - * @copyright 2014 Copernica BV - */ -#include "includes.h" -#include "channelopenokframe.h" - -/** - * Set up namespace - */ -namespace AMQP { - -/** - * Process the frame - * @param connection The connection over which it was received - * @return bool Was it succesfully processed? - */ -bool ChannelOpenOKFrame::process(ConnectionImpl *connection) -{ - // we need the appropriate channel - ChannelImpl *channel = connection->channel(this->channel()); - - // channel does not exist - if(!channel) return false; - - // report that the channel is open - channel->reportReady(); - - // done - return true; -} - -/** - * End of namespace - */ -} - diff --git a/src/channelopenokframe.h b/src/channelopenokframe.h index 6976936..44cd57a 100644 --- a/src/channelopenokframe.h +++ b/src/channelopenokframe.h @@ -68,7 +68,20 @@ public: * @param connection The connection over which it was received * @return bool Was it succesfully processed? */ - virtual bool process(ConnectionImpl *connection) override; + virtual bool process(ConnectionImpl *connection) override + { + // we need the appropriate channel + ChannelImpl *channel = connection->channel(this->channel()); + + // channel does not exist + if(!channel) return false; + + // report that the channel is open + channel->reportReady(); + + // done + return true; + } }; /** diff --git a/src/connectioncloseframe.cpp b/src/connectioncloseframe.cpp deleted file mode 100644 index eecff10..0000000 --- a/src/connectioncloseframe.cpp +++ /dev/null @@ -1,39 +0,0 @@ -/** - * ConnectionCloseFrame.cpp - * - * @copyright 2014 Copernica BV - */ -#include "includes.h" -#include "connectioncloseframe.h" -#include "connectioncloseokframe.h" - -/** - * Set up namespace - */ -namespace AMQP { - -/** - * Process the frame - * @param connection The connection over which it was received - * @return bool Was it succesfully processed? - */ -bool ConnectionCloseFrame::process(ConnectionImpl *connection) -{ - // @todo connection could be destructed after frame was sent - - // send back the ok frame - connection->send(ConnectionCloseOKFrame()); - - // no need to check for a channel, the error is connection wide - // report the error on the connection - connection->reportError(text()); - - // done - return true; -} - -/** - * End of namespace - */ -} - diff --git a/src/connectioncloseframe.h b/src/connectioncloseframe.h index 5a4e346..465a0c0 100644 --- a/src/connectioncloseframe.h +++ b/src/connectioncloseframe.h @@ -144,8 +144,20 @@ public: * @param connection The connection over which it was received * @return bool Was it succesfully processed? */ - virtual bool process(ConnectionImpl *connection) override; - + virtual bool process(ConnectionImpl *connection) override + { + // @todo connection could be destructed after frame was sent + + // send back the ok frame + connection->send(ConnectionCloseOKFrame()); + + // no need to check for a channel, the error is connection wide + // report the error on the connection + connection->reportError(text()); + + // done + return true; + } }; /** diff --git a/src/connectionimpl.cpp b/src/connectionimpl.cpp index 8dbb101..ad58663 100644 --- a/src/connectionimpl.cpp +++ b/src/connectionimpl.cpp @@ -7,8 +7,6 @@ */ #include "includes.h" #include "protocolheaderframe.h" -#include "exception.h" -#include "protocolexception.h" /** * set namespace @@ -109,6 +107,8 @@ void ConnectionImpl::remove(ChannelImpl *channel) */ size_t ConnectionImpl::parse(char *buffer, size_t size) { + // @todo do not parse if already in an error state + // number of bytes processed size_t processed = 0; diff --git a/src/connectionopenokframe.cpp b/src/connectionopenokframe.cpp deleted file mode 100644 index 7d085dd..0000000 --- a/src/connectionopenokframe.cpp +++ /dev/null @@ -1,32 +0,0 @@ -/** - * ConnectionOpenOKFrame.cpp - * - * @copyright 2014 Copernica BV - */ -#include "includes.h" -#include "connectionopenokframe.h" - -/** - * Set up namespace - */ -namespace AMQP { - -/** - * Process the frame - * @param connection The connection over which it was received - * @return bool Was it succesfully processed? - */ -bool ConnectionOpenOKFrame::process(ConnectionImpl *connection) -{ - // all is ok, mark the connection as connected - connection->setConnected(); - - // done - return true; -} - -/** - * End of namespace - */ -} - diff --git a/src/connectionopenokframe.h b/src/connectionopenokframe.h index e131e2a..304007f 100644 --- a/src/connectionopenokframe.h +++ b/src/connectionopenokframe.h @@ -77,7 +77,14 @@ public: * @param connection The connection over which it was received * @return bool Was it succesfully processed? */ - virtual bool process(ConnectionImpl *connection) override; + virtual bool process(ConnectionImpl *connection) override + { + // all is ok, mark the connection as connected + connection->setConnected(); + + // done + return true; + } }; /** diff --git a/src/connectionstartframe.cpp b/src/connectionstartframe.cpp deleted file mode 100644 index 12efd98..0000000 --- a/src/connectionstartframe.cpp +++ /dev/null @@ -1,50 +0,0 @@ -/** - * ConnectionStartFrame.h - * - * @copyright 2014 Copernica BV - */ -#include "includes.h" -#include "connectionstartframe.h" -#include "connectionstartokframe.h" - -/** - * Set up namespace - */ -namespace AMQP { - -/** - * Process the connection start frame - * @param connection - * @return bool - * @internal - */ -bool ConnectionStartFrame::process(ConnectionImpl *connection) -{ - // @todo we must still be in protocol handshake mode - - - // the peer properties - Table properties; - - // fill the peer properties - properties["product"] = "Copernica AMQP library"; - properties["version"] = "0.1"; - properties["platform"] = "Ubuntu"; - properties["copyright"] = "Copyright 2014 Copernica BV"; - properties["information"] = ""; - - // move connection to handshake mode - connection->setProtocolOk(); - - // send back a connection start ok frame - connection->send(ConnectionStartOKFrame(properties, "PLAIN", connection->login().saslPlain(), "en_US")); - - // done - return true; -} - -/** - * End of namespace - */ -} - diff --git a/src/connectionstartframe.h b/src/connectionstartframe.h index d3607d0..d462531 100644 --- a/src/connectionstartframe.h +++ b/src/connectionstartframe.h @@ -171,8 +171,30 @@ public: * @return bool * @internal */ - virtual bool process(ConnectionImpl *connection) override; - + virtual bool process(ConnectionImpl *connection) override + { + // @todo we must still be in protocol handshake mode + + + // the peer properties + Table properties; + + // fill the peer properties + properties["product"] = "Copernica AMQP library"; + properties["version"] = "0.1"; + properties["platform"] = "Ubuntu"; + properties["copyright"] = "Copyright 2014 Copernica BV"; + properties["information"] = ""; + + // move connection to handshake mode + connection->setProtocolOk(); + + // send back a connection start ok frame + connection->send(ConnectionStartOKFrame(properties, "PLAIN", connection->login().saslPlain(), "en_US")); + + // done + return true; + } }; /** diff --git a/src/connectiontuneframe.cpp b/src/connectiontuneframe.cpp deleted file mode 100644 index 133eae4..0000000 --- a/src/connectiontuneframe.cpp +++ /dev/null @@ -1,48 +0,0 @@ -/** - * ConnectionTuneFrame.cpp - * - * @copyright 2014 - */ -#include "includes.h" -#include "connectiontuneframe.h" -#include "connectiontuneokframe.h" -#include "connectionopenframe.h" - -/** - * Set up namespace - */ -namespace AMQP { - -/** - * Process the frame - * @param connection The connection over which it was received - * @return bool Was it succesfully processed? - */ -bool ConnectionTuneFrame::process(ConnectionImpl *connection) -{ - // @todo this is only allowed when the connection is set up - - - // remember this in the connection - connection->setCapacity(channelMax(), frameMax()); - - // theoretically it is possible that the connection object gets destructed between sending the messages - Monitor monitor(connection); - - // send it back - connection->send(ConnectionTuneOKFrame(channelMax(), frameMax(), heartbeat())); - - // check if the connection object still exists - if (!monitor.valid()) return true; - - // and finally we start to open the frame - connection->send(ConnectionOpenFrame(connection->vhost())); - - // done - return true; -} - -/** - * End of namespace - */ -} diff --git a/src/connectiontuneframe.h b/src/connectiontuneframe.h index 1a8cac2..4cdb535 100644 --- a/src/connectiontuneframe.h +++ b/src/connectiontuneframe.h @@ -129,7 +129,29 @@ public: * @param connection The connection over which it was received * @return bool Was it succesfully processed? */ - virtual bool process(ConnectionImpl *connection) override; + virtual bool process(ConnectionImpl *connection) override + { + // @todo this is only allowed when the connection is set up + + + // remember this in the connection + connection->setCapacity(channelMax(), frameMax()); + + // theoretically it is possible that the connection object gets destructed between sending the messages + Monitor monitor(connection); + + // send it back + connection->send(ConnectionTuneOKFrame(channelMax(), frameMax(), heartbeat())); + + // check if the connection object still exists + if (!monitor.valid()) return true; + + // and finally we start to open the frame + connection->send(ConnectionOpenFrame(connection->vhost())); + + // done + return true; + } }; /** diff --git a/src/exchangebindokframe.cpp b/src/exchangebindokframe.cpp deleted file mode 100644 index 2e49270..0000000 --- a/src/exchangebindokframe.cpp +++ /dev/null @@ -1,32 +0,0 @@ -/** - * Exchangebindokframe.cpp - */ - -#include "includes.h" -#include "exchangebindokframe.h" - -// setup namespace -namespace AMQP { - -/** - * Process the frame - * @param connection The connection over which it was received - * @return bool Was it succesfully processed? - */ -bool ExchangeBindOKFrame::process(ConnectionImpl *connection) -{ - // check if we have a channel - ChannelImpl *channel = connection->channel(this->channel()); - - // channel does not exist - if(!channel) return false; - - // report to handler - channel->reportExchangeBound(); - - // done - return true; -} - -// end namespace -} diff --git a/src/exchangebindokframe.h b/src/exchangebindokframe.h index e232b86..ad3c54c 100644 --- a/src/exchangebindokframe.h +++ b/src/exchangebindokframe.h @@ -58,8 +58,20 @@ public: * @param connection The connection over which it was received * @return bool Was it succesfully processed? */ - virtual bool process(ConnectionImpl *connection) override; - + virtual bool process(ConnectionImpl *connection) override + { + // check if we have a channel + ChannelImpl *channel = connection->channel(this->channel()); + + // channel does not exist + if(!channel) return false; + + // report to handler + channel->reportExchangeBound(); + + // done + return true; + } }; // end namespace diff --git a/src/exchangedeclareokframe.cpp b/src/exchangedeclareokframe.cpp deleted file mode 100644 index 15068f1..0000000 --- a/src/exchangedeclareokframe.cpp +++ /dev/null @@ -1,34 +0,0 @@ -/** - * ExchangeDeclareOKFrame.cpp - * - * @copyright 2014 Copernica BV - */ - -#include "includes.h" -#include "exchangedeclareokframe.h" - -// setup namespace -namespace AMQP { - -/** - * Process the frame - * @param connection The connection over which it was received - * @return bool Was it succesfully processed? - */ -bool ExchangeDeclareOKFrame::process(ConnectionImpl *connection) -{ - // we need the appropriate channel - ChannelImpl *channel = connection->channel(this->channel()); - - // channel does not exist - if(!channel) return false; - - // report exchange declare ok - channel->reportExchangeDeclared(); - - // done - return true; -} - -// end namespace -} diff --git a/src/exchangedeclareokframe.h b/src/exchangedeclareokframe.h index 9774ade..b271601 100644 --- a/src/exchangedeclareokframe.h +++ b/src/exchangedeclareokframe.h @@ -61,8 +61,20 @@ public: * @param connection The connection over which it was received * @return bool Was it succesfully processed? */ - virtual bool process(ConnectionImpl *connection) override; - + virtual bool process(ConnectionImpl *connection) override + { + // we need the appropriate channel + ChannelImpl *channel = connection->channel(this->channel()); + + // channel does not exist + if(!channel) return false; + + // report exchange declare ok + channel->reportExchangeDeclared(); + + // done + return true; + } }; /** diff --git a/src/exchangedeleteokframe.cpp b/src/exchangedeleteokframe.cpp deleted file mode 100644 index 0bd77e5..0000000 --- a/src/exchangedeleteokframe.cpp +++ /dev/null @@ -1,38 +0,0 @@ -/** - * ExchangeDeleteOKFrame.cpp - * - * @copyright 2014 Copernica BV - */ -#include "includes.h" -#include "exchangedeleteokframe.h" - -/** - * Set up namespace - */ -namespace AMQP { - -/** - * Process the frame - * @param connection The connection over which it was received - * @return bool Was it succesfully processed? - */ -bool ExchangeDeleteOKFrame::process(ConnectionImpl *connection) -{ - // check if we have a channel - ChannelImpl *channel = connection->channel(this->channel()); - - // channel does not exist - if(!channel) return false; - - // report to handler - channel->reportExchangeDeleted(); - - // done - return true; -} - -/** - * End of namespace - */ -} - diff --git a/src/exchangedeleteokframe.h b/src/exchangedeleteokframe.h index e1f8d02..c386358 100644 --- a/src/exchangedeleteokframe.h +++ b/src/exchangedeleteokframe.h @@ -62,7 +62,20 @@ public: * @param connection The connection over which it was received * @return bool Was it succesfully processed? */ - virtual bool process(ConnectionImpl *connection) override; + virtual bool process(ConnectionImpl *connection) override + { + // check if we have a channel + ChannelImpl *channel = connection->channel(this->channel()); + + // channel does not exist + if(!channel) return false; + + // report to handler + channel->reportExchangeDeleted(); + + // done + return true; + } }; /** diff --git a/src/exchangeunbindokframe.cpp b/src/exchangeunbindokframe.cpp deleted file mode 100644 index 5e80778..0000000 --- a/src/exchangeunbindokframe.cpp +++ /dev/null @@ -1,32 +0,0 @@ -/** - * QueuePurgeOKFrame.cpp - */ - -#include "includes.h" -#include "exchangeunbindokframe.h" - -// setup namespace -namespace AMQP { - -/** - * Process the frame - * @param connection The connection over which it was received - * @return bool Was it succesfully processed? - */ -bool ExchangeUnbindOKFrame::process(ConnectionImpl *connection) -{ - // check if we have a channel - ChannelImpl *channel = connection->channel(this->channel()); - - // channel does not exist - if(!channel) return false; - - // report to handler - channel->reportExchangeUnbound(); - - // done - return true; -} - -// end namespace -} diff --git a/src/exchangeunbindokframe.h b/src/exchangeunbindokframe.h index 1c66703..f3a2a9b 100644 --- a/src/exchangeunbindokframe.h +++ b/src/exchangeunbindokframe.h @@ -59,7 +59,20 @@ public: * @param connection The connection over which it was received * @return bool Was it succesfully processed? */ - virtual bool process(ConnectionImpl *connection) override; + virtual bool process(ConnectionImpl *connection) override + { + // check if we have a channel + ChannelImpl *channel = connection->channel(this->channel()); + + // channel does not exist + if(!channel) return false; + + // report to handler + channel->reportExchangeUnbound(); + + // done + return true; + } }; // end namespace diff --git a/src/extframe.cpp b/src/extframe.cpp deleted file mode 100644 index 7d215b3..0000000 --- a/src/extframe.cpp +++ /dev/null @@ -1,33 +0,0 @@ -/** - * ExtFrame.cpp - * - * @copyright 2014 Copernica BV - */ -#include "includes.h" -#include "exception.h" -#include "protocolexception.h" - -/** - * Set up namespace - */ -namespace AMQP { - -/** - * Process the frame - * @param connection The connection over which it was received - * @return bool Was it succesfully processed? - */ -bool ExtFrame::process(ConnectionImpl *connection) -{ - // this is an exception - throw ProtocolException("unimplemented frame type " + std::to_string(type())); - - // unreachable - return false; -} - -/** - * End of namespace - */ -} - diff --git a/src/extframe.h b/src/extframe.h index c37d4ed..eb35c68 100644 --- a/src/extframe.h +++ b/src/extframe.h @@ -137,8 +137,14 @@ public: * @param connection The connection over which it was received * @return bool Was it succesfully processed? */ - virtual bool process(ConnectionImpl *connection); - + virtual bool process(ConnectionImpl *connection) override + { + // this is an exception + throw ProtocolException("unimplemented frame type " + std::to_string(type())); + + // unreachable + return false; + } }; /** diff --git a/src/frame.cpp b/src/frame.cpp deleted file mode 100644 index e7cc89f..0000000 --- a/src/frame.cpp +++ /dev/null @@ -1,33 +0,0 @@ -/** - * Frame.cpp - * - * @copyright 2014 Copernica BV - */ -#include "includes.h" -#include "exception.h" -#include "protocolexception.h" - -/** - * Set up namespace - */ -namespace AMQP { - -/** - * Process the frame - * @param connection The connection over which it was received - * @return bool Was it succesfully processed? - */ -bool Frame::process(ConnectionImpl *connection) -{ - // this is an exception - throw ProtocolException("unimplemented frame"); - - // unreachable - return false; -} - -/** - * End of namespace - */ -} - diff --git a/src/frame.h b/src/frame.h index 90298a3..ed8ad0a 100644 --- a/src/frame.h +++ b/src/frame.h @@ -59,7 +59,14 @@ public: * @param connection The connection over which it was received * @return bool Was it succesfully processed? */ - virtual bool process(ConnectionImpl *connection); + virtual bool process(ConnectionImpl *connection) + { + // this is an exception + throw ProtocolException("unimplemented frame"); + + // unreachable + return false; + } }; /** diff --git a/src/heartbeatframe.cpp b/src/heartbeatframe.cpp deleted file mode 100644 index 8861c6c..0000000 --- a/src/heartbeatframe.cpp +++ /dev/null @@ -1,32 +0,0 @@ -/** - * HeartbeatFrame.cpp - * - * @copyright 2014 Copernica BV - */ -#include "includes.h" -#include "heartbeatframe.h" - -/** - * Namespace - */ -namespace AMQP { - -/** - * Process the frame - * @param connection The connection over which it was received - * @return bool Was it succesfully processed? - */ -bool HeartbeatFrame::process(ConnectionImpl *connection) -{ - // send back the same frame - connection->send(*this); - - // done - return true; -} - -/** - * End namespace - */ -} - diff --git a/src/heartbeatframe.h b/src/heartbeatframe.h index 103cec6..d046420 100644 --- a/src/heartbeatframe.h +++ b/src/heartbeatframe.h @@ -55,8 +55,14 @@ public: * @param connection The connection over which it was received * @return bool Was it succesfully processed? */ - virtual bool process(ConnectionImpl *connection) override; + virtual bool process(ConnectionImpl *connection) override + { + // send back the same frame + connection->send(*this); + // done + return true; + } }; /** diff --git a/src/includes.h b/src/includes.h index d54e529..80d3d04 100644 --- a/src/includes.h +++ b/src/includes.h @@ -11,6 +11,8 @@ #include "../libamqp.h" // classes that are very commonly used +#include "exception.h" +#include "protocolexception.h" #include "frame.h" #include "extframe.h" #include "methodframe.h" diff --git a/src/messageimpl.h b/src/messageimpl.h new file mode 100644 index 0000000..e4f60c6 --- /dev/null +++ b/src/messageimpl.h @@ -0,0 +1,106 @@ +/** + * MessageImpl.h + * + * Implementation of the message object that is only accessible for the + * AMQP library internals + * + * @copyright 2014 Copernica BV + */ + +/** + * Namespace + */ +namespace AMQP { + +/** + * Class definition + */ +class MessageImpl : public Message +{ +private: + /** + * How many bytes have been received? + * @var uint64_t + */ + uint64_t _received; + + /** + * Was the buffer allocated by us? + * @var bool + */ + bool _selfAllocated; + + +public: + /** + * Constructor + * @param frame + */ + MessageImpl(const BasicDeliverFrame &frame) : + Message(frame.consumerTag(), frame.deliveryTag(), frame.redelivered(), frame.exchange(), frame.routingKey()), + _received(0), _selfAllocated(false) + {} + + /** + * Destructor + */ + virtual ~MessageImpl() + { + // clear up memory if it was self allocated + if (_selfAllocated) delete[] _body; + } + + /** + * Set the body size + * This field is set when the header is received + * @param uint64_t + */ + void setBodySize(uint64_t size) + { + _bodySize = size; + } + + /** + * Append data + * @param buffer incoming data + * @param size size of the data + * @return bool true if the message is now complete + */ + bool append(const char *buffer, uint64_t size) + { + // is this the only data, and also direct complete? + if (_received == 0 && size >= _bodySize) + { + // we have everything + _body = buffer; + _received = _bodySize; + + // done + return true; + } + else + { + // it does not yet fit, do we have to allocate? + if (!_body) _body = new char[_bodySize]; + _selfAllocated = true; + + // prevent that size is too big + if (size > _bodySize - _received) size = _bodySize - _received; + + // append data + memcpy((char *)(_body + _received), buffer, size); + + // we have more data now + _received += size; + + // done + return _received >= _bodySize; + } + } +}; + +/** + * End of namespace + */ +} + diff --git a/src/methodframe.cpp b/src/methodframe.cpp deleted file mode 100644 index 6752658..0000000 --- a/src/methodframe.cpp +++ /dev/null @@ -1,33 +0,0 @@ -/** - * MethodFrame.cpp - * - * @copyright 2014 Copernica BV - */ -#include "includes.h" -#include "exception.h" -#include "protocolexception.h" - -/** - * Set up namespace - */ -namespace AMQP { - -/** - * Process the frame - * @param connection The connection over which it was received - * @return bool Was it succesfully processed? - */ -bool MethodFrame::process(ConnectionImpl *connection) -{ - // this is an exception - throw ProtocolException("unimplemented frame type " + std::to_string(type()) + " class " + std::to_string(classID()) + " method " + std::to_string(methodID())); - - // unreachable - return false; -} - -/** - * End of namespace - */ -} - diff --git a/src/methodframe.h b/src/methodframe.h index 029ed04..749e088 100644 --- a/src/methodframe.h +++ b/src/methodframe.h @@ -75,7 +75,14 @@ public: * @param connection The connection over which it was received * @return bool Was it succesfully processed? */ - virtual bool process(ConnectionImpl *connection); + virtual bool process(ConnectionImpl *connection) override + { + // this is an exception + throw ProtocolException("unimplemented frame type " + std::to_string(type()) + " class " + std::to_string(classID()) + " method " + std::to_string(methodID())); + + // unreachable + return false; + } }; /** diff --git a/src/queuebindokframe.cpp b/src/queuebindokframe.cpp deleted file mode 100644 index f7b72ae..0000000 --- a/src/queuebindokframe.cpp +++ /dev/null @@ -1,31 +0,0 @@ -/** - * QueueDeclareOKFrame.cpp - */ - -#include "includes.h" -#include "queuebindokframe.h" - -// setup namespace -namespace AMQP { - -/** - * Process the frame - * @param connection The connection over which it was received - * @return bool Was it succesfully processed? - */ -bool QueueBindOKFrame::process(ConnectionImpl *connection) -{ - // check if we have a channel - ChannelImpl *channel = connection->channel(this->channel()); - - // channel does not exist - if(!channel) return false; - - // report to handler - channel->reportQueueBound(); - - // done - return true; -} - -} diff --git a/src/queuebindokframe.h b/src/queuebindokframe.h index feab3df..6c66cb2 100644 --- a/src/queuebindokframe.h +++ b/src/queuebindokframe.h @@ -60,8 +60,20 @@ public: * @param connection The connection over which it was received * @return bool Was it succesfully processed? */ - virtual bool process(ConnectionImpl *connection) override; - + virtual bool process(ConnectionImpl *connection) override + { + // check if we have a channel + ChannelImpl *channel = connection->channel(this->channel()); + + // channel does not exist + if(!channel) return false; + + // report to handler + channel->reportQueueBound(); + + // done + return true; + } }; /** diff --git a/src/queuedeclareokframe.cpp b/src/queuedeclareokframe.cpp deleted file mode 100644 index 842fd50..0000000 --- a/src/queuedeclareokframe.cpp +++ /dev/null @@ -1,31 +0,0 @@ -/** - * QueueDeclareOKFrame.cpp - */ - -#include "includes.h" -#include "queuedeclareokframe.h" - -// setup namespace -namespace AMQP { - -/** - * Process the frame - * @param connection The connection over which it was received - * @return bool Was it succesfully processed? - */ -bool QueueDeclareOKFrame::process(ConnectionImpl *connection) -{ - // check if we have a channel - ChannelImpl *channel = connection->channel(this->channel()); - - // what if channel doesn't exist? - if (!channel) return false; - - // report to the handler - channel->reportQueueDeclared(this->name(), this->messageCount(), this->consumerCount()); - - // done - return true; -} - -} diff --git a/src/queuedeclareokframe.h b/src/queuedeclareokframe.h index 79df971..3cbc113 100644 --- a/src/queuedeclareokframe.h +++ b/src/queuedeclareokframe.h @@ -124,7 +124,20 @@ public: * @param connection The connection over which it was received * @return bool Was it succesfully processed? */ - virtual bool process(ConnectionImpl *connection) override; + virtual bool process(ConnectionImpl *connection) override + { + // check if we have a channel + ChannelImpl *channel = connection->channel(this->channel()); + + // what if channel doesn't exist? + if (!channel) return false; + + // report to the handler + channel->reportQueueDeclared(this->name(), this->messageCount(), this->consumerCount()); + + // done + return true; + } }; /** diff --git a/src/queuedeleteokframe.cpp b/src/queuedeleteokframe.cpp deleted file mode 100644 index ce1811b..0000000 --- a/src/queuedeleteokframe.cpp +++ /dev/null @@ -1,33 +0,0 @@ -/** - * QueueDeleteOKFrame.cpp - */ - -#include "includes.h" -#include "queuedeleteokframe.h" - -// setup namespace -namespace AMQP { - -/** - * Process the frame - * @param connection The connection over which it was received - * @return bool Was it succesfully processed? - */ -bool QueueDeleteOKFrame::process(ConnectionImpl *connection) -{ - // check if we have a channel - ChannelImpl *channel = connection->channel(this->channel()); - - // channel does not exist - if(!channel) return false; - - // report queue deletion success - channel->reportQueueDeleted(this->messageCount()); - - // done - return true; -} - - -// end namespace -} diff --git a/src/queuedeleteokframe.h b/src/queuedeleteokframe.h index 337a683..881ed2b 100644 --- a/src/queuedeleteokframe.h +++ b/src/queuedeleteokframe.h @@ -85,8 +85,20 @@ public: * @param connection The connection over which it was received * @return bool Was it succesfully processed? */ - virtual bool process(ConnectionImpl *connection) override; - + virtual bool process(ConnectionImpl *connection) override + { + // check if we have a channel + ChannelImpl *channel = connection->channel(this->channel()); + + // channel does not exist + if(!channel) return false; + + // report queue deletion success + channel->reportQueueDeleted(this->messageCount()); + + // done + return true; + } }; /** diff --git a/src/queuepurgeokframe.cpp b/src/queuepurgeokframe.cpp deleted file mode 100644 index 3673bff..0000000 --- a/src/queuepurgeokframe.cpp +++ /dev/null @@ -1,32 +0,0 @@ -/** - * QueuePurgeOKFrame.cpp - */ - -#include "includes.h" -#include "queuepurgeokframe.h" - -// setup namespace -namespace AMQP { - -/** - * Process the frame - * @param connection The connection over which it was received - * @return bool Was it succesfully processed? - */ -bool QueuePurgeOKFrame::process(ConnectionImpl *connection) -{ - // check if we have a channel - ChannelImpl *channel = connection->channel(this->channel()); - - // channel does not exist - if(!channel) return false; - - // report queue purge success - channel->reportQueuePurged(this->messageCount()); - - // done - return true; -} - -// end namespace -} diff --git a/src/queuepurgeokframe.h b/src/queuepurgeokframe.h index 3e0e6c4..715ef00 100644 --- a/src/queuepurgeokframe.h +++ b/src/queuepurgeokframe.h @@ -85,7 +85,20 @@ public: * @param connection The connection over which it was received * @return bool Was it succesfully processed? */ - virtual bool process(ConnectionImpl *connection) override; + virtual bool process(ConnectionImpl *connection) override + { + // check if we have a channel + ChannelImpl *channel = connection->channel(this->channel()); + + // channel does not exist + if(!channel) return false; + + // report queue purge success + channel->reportQueuePurged(this->messageCount()); + + // done + return true; + } }; /** diff --git a/src/queueunbindokframe.cpp b/src/queueunbindokframe.cpp deleted file mode 100644 index e2373e6..0000000 --- a/src/queueunbindokframe.cpp +++ /dev/null @@ -1,32 +0,0 @@ -/** - * QueueUnbindOKFrame.cpp - */ - -#include "includes.h" -#include "queueunbindokframe.h" - -// setup namespace -namespace AMQP { - -/** - * Process the frame - * @param connection The connection over which it was received - * @return bool Was it succesfully processed? - */ -bool QueueUnbindOKFrame::process(ConnectionImpl *connection) -{ - // check if we have a channel - ChannelImpl *channel = connection->channel(this->channel()); - - // channel does not exist - if(!channel) return false; - - // report queue unbind success - channel->reportQueueUnbound(); - - // done - return true; -} - -// end namespace -} diff --git a/src/queueunbindokframe.h b/src/queueunbindokframe.h index 9ebca90..cd779f0 100644 --- a/src/queueunbindokframe.h +++ b/src/queueunbindokframe.h @@ -64,7 +64,20 @@ public: * @param connection The connection over which it was received * @return bool Was it succesfully processed? */ - virtual bool process(ConnectionImpl *connection) override; + virtual bool process(ConnectionImpl *connection) override + { + // check if we have a channel + ChannelImpl *channel = connection->channel(this->channel()); + + // channel does not exist + if(!channel) return false; + + // report queue unbind success + channel->reportQueueUnbound(); + + // done + return true; + } }; /** diff --git a/src/receivedframe.cpp b/src/receivedframe.cpp index c55ad29..b1053c7 100644 --- a/src/receivedframe.cpp +++ b/src/receivedframe.cpp @@ -6,24 +6,23 @@ * @documentation private */ #include "includes.h" -#include "bodyframe.h" #include "heartbeatframe.h" -#include "connectionstartframe.h" #include "connectionstartokframe.h" +#include "connectionstartframe.h" #include "connectionsecureframe.h" #include "connectionsecureokframe.h" -#include "connectiontuneframe.h" -#include "connectiontuneokframe.h" -#include "connectionopenframe.h" #include "connectionopenokframe.h" -#include "connectioncloseframe.h" +#include "connectionopenframe.h" +#include "connectiontuneokframe.h" +#include "connectiontuneframe.h" #include "connectioncloseokframe.h" +#include "connectioncloseframe.h" #include "channelopenframe.h" #include "channelopenokframe.h" #include "channelflowframe.h" #include "channelflowokframe.h" -#include "channelcloseframe.h" #include "channelcloseokframe.h" +#include "channelcloseframe.h" #include "exchangedeclareframe.h" #include "exchangedeclareokframe.h" #include "exchangedeleteframe.h" @@ -65,9 +64,9 @@ #include "transactioncommitokframe.h" #include "transactionrollbackframe.h" #include "transactionrollbackokframe.h" +#include "messageimpl.h" +#include "bodyframe.h" #include "basicheaderframe.h" -#include "exception.h" -#include "protocolexception.h" #include "framecheck.h" #define TYPE_INVALID 0 diff --git a/tests/myconnection.cpp b/tests/myconnection.cpp index 76b6baf..a98cdfa 100644 --- a/tests/myconnection.cpp +++ b/tests/myconnection.cpp @@ -355,11 +355,12 @@ void MyConnection::onQueueBound(AMQP::Channel *channel) // show std::cout << "AMQP Queue bound" << std::endl; - _connection->setQos(10); -// _channel->setQos(10); +// _connection->setQos(10); + _channel->setQos(1); _channel->publish("my_exchange", "key", "this is the message"); + _channel->consume("my_queue"); } /** @@ -403,3 +404,38 @@ void MyConnection::onQosSet(AMQP::Channel *channel) std::cout << "AMQP Qos set" << std::endl; } +/** + * Method that is called when a consumer was started + * This is the result of a call to Channel::consume() + * @param channel the channel on which the consumer was started + * @param tag the consumer tag + */ +void MyConnection::onConsumerStarted(AMQP::Channel *channel, const std::string &tag) +{ + // show + std::cout << "AMQP consumer started" << std::endl; +} + +/** + * Method that is called when a message has been consumed + * @param channel the channel on which the consumer was started + * @param message the consumed message + */ +void MyConnection::onConsumed(AMQP::Channel *channel, const AMQP::Message &message) +{ + // show + std::cout << "AMQP consumed: " << message.message() << std::endl; +} + +/** + * Method that is called when a consumer was stopped + * This is the result of a call to Channel::cancel() + * @param channel the channel on which the consumer was stopped + * @param tag the consumer tag + */ +void MyConnection::onConsumerStopped(AMQP::Channel *channel, const std::string &tag) +{ + // show + std::cout << "AMQP consumer stopped" << std::endl; +} + diff --git a/tests/myconnection.h b/tests/myconnection.h index fcb0ef2..f84a49b 100644 --- a/tests/myconnection.h +++ b/tests/myconnection.h @@ -157,19 +157,19 @@ private: * Method that is called when an exchange is bound * @param channel */ - virtual void onExchangeBound(AMQP::Channel *channel); + virtual void onExchangeBound(AMQP::Channel *channel) override; /** * Method that is called when an exchange is unbound * @param channel */ - virtual void onExchangeUnbound(AMQP::Channel *channel); + virtual void onExchangeUnbound(AMQP::Channel *channel) override; /** * Method that is called when an exchange is deleted * @param channel */ - virtual void onExchangeDeleted(AMQP::Channel *channel); + virtual void onExchangeDeleted(AMQP::Channel *channel) override; /** * Mehod that is called when an exchange is declared @@ -184,39 +184,62 @@ private: * @param messageCount number of messages in queue * @param consumerCount number of active consumers */ - virtual void onQueueDeclared(AMQP::Channel *channel, const std::string &name, uint32_t messageCount, uint32_t consumerCount); + virtual void onQueueDeclared(AMQP::Channel *channel, const std::string &name, uint32_t messageCount, uint32_t consumerCount) override; /** * Method that is called when a queue is bound * @param channel * @param */ - virtual void onQueueBound(AMQP::Channel *channel); + virtual void onQueueBound(AMQP::Channel *channel) override; /** * Method that is called when a queue is deleted * @param channel * @param messageCount number of messages deleted along with the queue */ - virtual void onQueueDeleted(AMQP::Channel *channel, uint32_t messageCount); + virtual void onQueueDeleted(AMQP::Channel *channel, uint32_t messageCount) override; /** * Method that is called when a queue is unbound * @param channel */ - virtual void onQueueUnbound(AMQP::Channel *channel); + virtual void onQueueUnbound(AMQP::Channel *channel) override; /** * Method that is called when a queue is purged * @param messageCount number of message purged */ - virtual void onQueuePurged(AMQP::Channel *channel, uint32_t messageCount); + virtual void onQueuePurged(AMQP::Channel *channel, uint32_t messageCount) override; /** * Method that is called when the quality-of-service was changed * This is the result of a call to Channel::setQos() */ - virtual void onQosSet(AMQP::Channel *channel); + virtual void onQosSet(AMQP::Channel *channel) override; + + /** + * Method that is called when a consumer was started + * This is the result of a call to Channel::consume() + * @param channel the channel on which the consumer was started + * @param tag the consumer tag + */ + virtual void onConsumerStarted(AMQP::Channel *channel, const std::string &tag) override; + + /** + * Method that is called when a message has been consumed + * @param channel the channel on which the consumer was started + * @param message the consumed message + */ + virtual void onConsumed(AMQP::Channel *channel, const AMQP::Message &message) override; + + /** + * Method that is called when a consumer was stopped + * This is the result of a call to Channel::cancel() + * @param channel the channel on which the consumer was stopped + * @param tag the consumer tag + */ + virtual void onConsumerStopped(AMQP::Channel *channel, const std::string &tag) override; public: