Moved the remaining methods over to deferred handlers

This commit is contained in:
Martijn Otto 2014-04-10 12:51:04 +02:00
parent e1b0e3dea1
commit d2c17869e0
16 changed files with 388 additions and 774 deletions

View File

@ -51,8 +51,8 @@
#include <amqpcpp/exchangetype.h>
#include <amqpcpp/flags.h>
#include <amqpcpp/deferred.h>
#include <amqpcpp/deferredconsumer.h>
#include <amqpcpp/callbacks.h>
#include <amqpcpp/channelhandler.h>
#include <amqpcpp/channelimpl.h>
#include <amqpcpp/channel.h>
#include <amqpcpp/login.h>

View File

@ -23,7 +23,8 @@ private:
std::tuple<
std::deque<Deferred<>>,
std::deque<Deferred<const std::string&, uint32_t, uint32_t>>,
std::deque<Deferred<uint32_t>>
std::deque<Deferred<uint32_t>>,
std::deque<Deferred<const std::string&>>
> _callbacks;
/**

View File

@ -26,9 +26,8 @@ public:
/**
* Construct a channel object
* @param connection
* @param handler
*/
Channel(Connection *connection, ChannelHandler *handler) : _implementation(this, connection, handler) {}
Channel(Connection *connection) : _implementation(this, connection) {}
/**
* Destructor
@ -324,7 +323,7 @@ public:
*
* void myCallback(AMQP::Channel *channel, uint32_t messageCount);
*
* For example: channel.declareQueue("myqueue").onSuccess([](AMQP::Channel *channel, uint32_t messageCount) {
* For example: channel.removeQueue("myqueue").onSuccess([](AMQP::Channel *channel, uint32_t messageCount) {
*
* std::cout << "Queue deleted, along with " << messageCount << " messages" << std::endl;
*
@ -335,29 +334,20 @@ public:
/**
* Publish a message to an exchange
*
* The following flags can be used
*
* - mandatory if set, an unroutable message will be reported to the channel handler with the onReturned method
* - immediate if set, a message that could not immediately be consumed is returned to the onReturned method
*
* If either of the two flags is set, and the message could not immediately
* be published, the message is returned by the server to the client. If you
* want to catch such returned messages, you need to implement the
* ChannelHandler::onReturned() method.
* want to catch such returned messages, you need to install a handler using
* the onReturned() method.
*
* @param exchange the exchange to publish to
* @param routingkey the routing key
* @param flags optional flags (see above)
* @param envelope the full envelope to send
* @param message the message to send
* @param size size of the message
*/
bool publish(const std::string &exchange, const std::string &routingKey, int flags, const Envelope &envelope) { return _implementation.publish(exchange, routingKey, flags, envelope); }
bool publish(const std::string &exchange, const std::string &routingKey, const Envelope &envelope) { return _implementation.publish(exchange, routingKey, 0, envelope); }
bool publish(const std::string &exchange, const std::string &routingKey, int flags, const std::string &message) { return _implementation.publish(exchange, routingKey, flags, Envelope(message)); }
bool publish(const std::string &exchange, const std::string &routingKey, const std::string &message) { return _implementation.publish(exchange, routingKey, 0, Envelope(message)); }
bool publish(const std::string &exchange, const std::string &routingKey, int flags, const char *message, size_t size) { return _implementation.publish(exchange, routingKey, flags, Envelope(message, size)); }
bool publish(const std::string &exchange, const std::string &routingKey, const char *message, size_t size) { return _implementation.publish(exchange, routingKey, 0, Envelope(message, size)); }
bool publish(const std::string &exchange, const std::string &routingKey, const Envelope &envelope) { return _implementation.publish(exchange, routingKey, envelope); }
bool publish(const std::string &exchange, const std::string &routingKey, const std::string &message) { return _implementation.publish(exchange, routingKey, Envelope(message)); }
bool publish(const std::string &exchange, const std::string &routingKey, const char *message, size_t size) { return _implementation.publish(exchange, routingKey, Envelope(message, size)); }
/**
* Set the Quality of Service (QOS) for this channel
@ -392,7 +382,7 @@ public:
* - exclusive request exclusive access, only this consumer can access the queue
* - nowait the server does not have to send a response back that consuming is active
*
* The method ChannelHandler::onConsumerStarted() will be called when the
* The method Deferred::onSuccess() will be called when the
* consumer has started (unless the nowait option was set, in which case
* no confirmation method is called)
*
@ -400,14 +390,26 @@ public:
* @param tag a consumer tag that will be associated with this consume operation
* @param flags additional flags
* @param arguments additional arguments
* @return bool
*
* This function returns a deferred handler. Callbacks can be installed
* using onSuccess(), onError() and onFinalize() methods.
*
* The onSuccess() callback that you can install should have the following signature:
*
* void myCallback(AMQP::Channel *channel, const std::string& tag);
*
* For example: channel.consume("myqueue").onSuccess([](AMQP::Channel *channel, const std::string& tag) {
*
* std::cout << "Started consuming under tag " << tag << std::endl;
*
* });
*/
bool consume(const std::string &queue, const std::string &tag, int flags, const Table &arguments) { return _implementation.consume(queue, tag, flags, arguments); }
bool consume(const std::string &queue, const std::string &tag, int flags = 0) { return _implementation.consume(queue, tag, flags, Table()); }
bool consume(const std::string &queue, const std::string &tag, const Table &arguments) { return _implementation.consume(queue, tag, 0, arguments); }
bool consume(const std::string &queue, int flags, const Table &arguments) { return _implementation.consume(queue, std::string(), flags, arguments); }
bool consume(const std::string &queue, int flags = 0) { return _implementation.consume(queue, std::string(), flags, Table()); }
bool consume(const std::string &queue, const Table &arguments) { return _implementation.consume(queue, std::string(), 0, arguments); }
DeferredConsumer& consume(const std::string &queue, const std::string &tag, int flags, const Table &arguments) { return _implementation.consume(queue, tag, flags, arguments); }
DeferredConsumer& consume(const std::string &queue, const std::string &tag, int flags = 0) { return _implementation.consume(queue, tag, flags, Table()); }
DeferredConsumer& consume(const std::string &queue, const std::string &tag, const Table &arguments) { return _implementation.consume(queue, tag, 0, arguments); }
DeferredConsumer& consume(const std::string &queue, int flags, const Table &arguments) { return _implementation.consume(queue, std::string(), flags, arguments); }
DeferredConsumer& consume(const std::string &queue, int flags = 0) { return _implementation.consume(queue, std::string(), flags, Table()); }
DeferredConsumer& consume(const std::string &queue, const Table &arguments) { return _implementation.consume(queue, std::string(), 0, arguments); }
/**
* Cancel a running consume call
@ -418,27 +420,39 @@ public:
*
* - nowait the server does not have to send a response back that the consumer has been cancelled
*
* The method ChannelHandler::onConsumerStopped() will be called when the consumer
* The method Deferred::onSuccess() will be called when the consumer
* was succesfully stopped (unless the nowait option was used, in which case no
* confirmation method is called)
*
* @param tag the consumer tag
* @param flags optional additional flags
* @return bool
*
* This function returns a deferred handler. Callbacks can be installed
* using onSuccess(), onError() and onFinalize() methods.
*
* The onSuccess() callback that you can install should have the following signature:
*
* void myCallback(AMQP::Channel *channel, const std::string& tag);
*
* For example: channel.cancel("myqueue").onSuccess([](AMQP::Channel *channel, const std::string& tag) {
*
* std::cout << "Stopped consuming under tag " << tag << std::endl;
*
* });
*/
bool cancel(const std::string &tag, int flags = 0) { return _implementation.cancel(tag, flags); }
Deferred<const std::string&>& cancel(const std::string &tag, int flags = 0) { return _implementation.cancel(tag, flags); }
/**
* Acknoldge a received message
*
* When a message is received in the ChannelHandler::onReceived() method,
* you must acknoledge it so that RabbitMQ removes it from the queue (unless
* When a message is received in the DeferredConsumer::onReceived() method,
* you must acknowledge it so that RabbitMQ removes it from the queue (unless
* you are consuming with the noack option). This method can be used for
* this acknoledging.
* this acknowledging.
*
* The following flags are supported:
*
* - multiple acknoledge multiple messages: all un-acked messages that were earlier delivered are acknowledged too
* - multiple acknowledge multiple messages: all un-acked messages that were earlier delivered are acknowledged too
*
* @param deliveryTag the unique delivery tag of the message
* @param flags optional flags
@ -449,8 +463,8 @@ public:
/**
* Reject or nack a message
*
* When a message was received in the ChannelHandler::onReceived() method,
* and you don't want to acknoledge it, you can also choose to reject it by
* When a message was received in the DeferredConsumer::onReceived() method,
* and you don't want to acknowledge it, you can also choose to reject it by
* calling this reject method.
*
* The following flags are supported:

View File

@ -1,87 +0,0 @@
#pragma once
/**
* ChannelHandler.h
*
* Interface that should be implemented by a user of the AMQP library,
* and that is passed to the Connection::createChannel() method.
*
* This interface contains a number of methods that are called when
* the channel changes state.
*
* @copyright 2014 Copernica BV
*/
/**
* Set up namespace
*/
namespace AMQP {
/**
* Class definition
*/
class ChannelHandler
{
public:
/**
* An error has occured on the channel
* The channel is no longer usable after an error has occured on it.
* @param channel the channel on which the error occured
* @param message human readable error message
*/
virtual void onError(Channel *channel, const std::string &message) {}
/**
* Method that is called when a queue is purged
* This is the result of a call to Channel::purgeQueue()
* @param channel the channel on which the queue was emptied
* @param messageCount number of message purged
*/
virtual void onQueuePurged(Channel *channel, uint32_t messageCount) {}
/**
* Method that is called when a consumer was started
* This is the result of a call to Channel::consume()
* @param channel the channel on which the consumer was started
* @param tag the consumer tag
*/
virtual void onConsumerStarted(Channel *channel, const std::string &tag) {}
/**
* Method that is called when a consumer was stopped
* This is the result of a call to Channel::cancel()
* @param channel the channel on which the consumer was stopped
* @param tag the consumer tag
*/
virtual void onConsumerStopped(Channel *channel, const std::string &tag) {}
/**
* Method that is called when a message has been received on a channel
* This message will be called for every message that is received after
* you started consuming. Make sure you acknowledge the messages when its
* safe to remove them from RabbitMQ (unless you set no-ack option when you
* started the consumer)
* @param channel the channel on which the consumer was started
* @param message the consumed message
* @param deliveryTag the delivery tag, you need this to acknowledge the message
* @param consumerTag the consumer identifier that was used to retrieve this message
* @param redelivered is this a redelivered message?
*/
virtual void onReceived(Channel *channel, const Message &message, uint64_t deliveryTag, const std::string &consumerTag, bool redelivered) {}
/**
* Method that is called when a message you tried to publish was returned
* by the server. This only happens when the 'mandatory' or 'immediate' flag
* was set with the Channel::publish() call.
* @param channel the channel on which the message was returned
* @param message the returned message
* @param code the reply code
* @param text human readable reply reason
*/
virtual void onReturned(Channel *channel, const Message &message, int16_t code, const std::string &text) {}
};
/**
* End of namespace
*/
}

View File

@ -32,12 +32,6 @@ private:
*/
ConnectionImpl *_connection;
/**
* The handler that is notified about events
* @var MyChannelHandler
*/
ChannelHandler *_handler;
/**
* Callback when the channel is ready
*/
@ -48,6 +42,11 @@ private:
*/
std::function<void(Channel *channel, const std::string& message)> _errorCallback;
/**
* Callback to execute when a message arrives
*/
std::unique_ptr<DeferredConsumer> _consumer;
/**
* The callbacks waiting to be called
*/
@ -69,12 +68,6 @@ private:
state_closed
} _state = state_connected;
/**
* Is a transaction now active?
* @var bool
*/
bool _transaction = false;
/**
* The message that is now being received
* @var MessageImpl
@ -90,9 +83,8 @@ private:
*
* @param parent the public channel object
* @param connection pointer to the connection
* @param handler handler that is notified on events
*/
ChannelImpl(Channel *parent, Connection *connection, ChannelHandler *handler = nullptr);
ChannelImpl(Channel *parent, Connection *connection);
public:
/**
@ -292,23 +284,17 @@ public:
/**
* Publish a message to an exchange
*
* The following flags can be used
*
* - mandatory if set, an unroutable message will be reported to the channel handler with the onReturned method
* - immediate if set, a message that could not immediately be consumed is returned to the onReturned method
*
* If the mandatory or immediate flag is set, and the message could not immediately
* be published, the message will be returned to the client, and will eventually
* end up in your ChannelHandler::onReturned() method.
* end up in your onReturned() handler method.
*
* @param exchange the exchange to publish to
* @param routingkey the routing key
* @param flags optional flags (see above)
* @param envelope the full envelope to send
* @param message the message to send
* @param size size of the message
*/
bool publish(const std::string &exchange, const std::string &routingKey, int flags, const Envelope &envelope);
bool publish(const std::string &exchange, const std::string &routingKey, const Envelope &envelope);
/**
* Set the Quality of Service (QOS) of the entire connection
@ -325,20 +311,44 @@ public:
* @param tag a consumer tag that will be associated with this consume operation
* @param flags additional flags
* @param arguments additional arguments
* @return bool
*
* This function returns a deferred handler. Callbacks can be installed
* using onSuccess(), onError() and onFinalize() methods.
*
* The onSuccess() callback that you can install should have the following signature:
*
* void myCallback(AMQP::Channel *channel, const std::string& tag);
*
* For example: channel.declareQueue("myqueue").onSuccess([](AMQP::Channel *channel, const std::string& tag) {
*
* std::cout << "Started consuming under tag " << tag << std::endl;
*
* });
*/
bool consume(const std::string &queue, const std::string &tag, int flags, const Table &arguments);
DeferredConsumer& consume(const std::string &queue, const std::string &tag, int flags, const Table &arguments);
/**
* Cancel a running consumer
* @param tag the consumer tag
* @param flags optional flags
* @return bool
*
* This function returns a deferred handler. Callbacks can be installed
* using onSuccess(), onError() and onFinalize() methods.
*
* The onSuccess() callback that you can install should have the following signature:
*
* void myCallback(AMQP::Channel *channel, const std::string& tag);
*
* For example: channel.declareQueue("myqueue").onSuccess([](AMQP::Channel *channel, const std::string& tag) {
*
* std::cout << "Started consuming under tag " << tag << std::endl;
*
* });
*/
bool cancel(const std::string &tag, int flags);
Deferred<const std::string&>& cancel(const std::string &tag, int flags);
/**
* Acknoledge a message
* Acknowledge a message
* @param deliveryTag the delivery tag
* @param flags optional flags
* @return bool
@ -446,24 +456,6 @@ public:
_callbacks.reportError(message);
}
/**
* Report that a consumer has started
* @param tag the consumer tag
*/
void reportConsumerStarted(const std::string &tag)
{
if (_handler) _handler->onConsumerStarted(_parent, tag);
}
/**
* Report that a consumer has stopped
* @param tag the consumer tag
*/
void reportConsumerStopped(const std::string &tag)
{
if (_handler) _handler->onConsumerStopped(_parent, tag);
}
/**
* Report that a message was received
*/
@ -475,7 +467,6 @@ public:
* @return MessageImpl
*/
MessageImpl *message(const BasicDeliverFrame &frame);
MessageImpl *message(const BasicReturnFrame &frame);
/**
* Retrieve the current incoming message
@ -486,6 +477,20 @@ public:
return _message;
}
/**
* Report that the consumer has started
*
* @param consumerTag the tag under which we are now consuming
*/
void reportConsumerStarted(const std::string& consumerTag)
{
// if we do not have a consumer, something is very wrong
if (!_consumer) reportError("Received basic consume ok frame, but no consumer was found");
// otherwise, we now report the consumer as started
else _consumer->success(consumerTag);
}
/**
* The channel class is its friend, thus can it instantiate this object
*/

View File

@ -24,16 +24,6 @@ template <typename... Arguments>
class Deferred
{
private:
/**
* The channel we operate under
*/
Channel *_channel;
/**
* Do we already know we failed?
*/
bool _failed;
/**
* Callback to execute on success
*/
@ -49,19 +39,12 @@ private:
*/
std::function<void(Channel *channel, const std::string& error)> _finalizeCallback;
/**
* The channel implementation may call our
* private members and construct us
*/
friend class ChannelImpl;
friend class Callbacks;
/**
* Indicate success
*
* @param parameters... the extra parameters relevant for this deferred handler
*/
void success(Arguments ...parameters)
void success(Arguments ...parameters) const
{
// execute callbacks if registered
if (_successCallback) _successCallback(_channel, parameters...);
@ -84,7 +67,24 @@ private:
}
/**
* Private constructor that can only be called
* The channel implementation may call our
* private members and construct us
*/
friend class ChannelImpl;
friend class Callbacks;
protected:
/**
* The channel we operate under
*/
Channel *_channel;
/**
* Do we already know we failed?
*/
bool _failed;
/**
* Protected constructor that can only be called
* from within the channel implementation
*
* @param channel the channel we operate under

View File

@ -1,6 +1,6 @@
/**
* Class describing a basic cancel ok frame
*
*
* @copyright 2014 Copernica BV
*/
@ -42,7 +42,7 @@ public:
*
* @param frame received frame
*/
BasicCancelOKFrame(ReceivedFrame &frame) :
BasicCancelOKFrame(ReceivedFrame &frame) :
BasicFrame(frame),
_consumerTag(frame)
{}
@ -89,13 +89,13 @@ public:
{
// we need the appropriate channel
ChannelImpl *channel = connection->channel(this->channel());
// channel does not exist
if (!channel) return false;
if (!channel) return false;
// report
channel->reportConsumerStopped(consumerTag());
channel->reportSuccess<const std::string&>(consumerTag());
// done
return true;
}

View File

@ -1,6 +1,6 @@
/**
* Class describing a basic consume ok frame
*
*
* @copyright 2014 Copernica BV
*/
@ -52,7 +52,7 @@ public:
*
* @param frame received frame
*/
BasicConsumeOKFrame(ReceivedFrame &frame) :
BasicConsumeOKFrame(ReceivedFrame &frame) :
BasicFrame(frame),
_consumerTag(frame)
{}
@ -89,13 +89,13 @@ public:
{
// we need the appropriate channel
ChannelImpl *channel = connection->channel(this->channel());
// channel does not exist
if (!channel) return false;
if (!channel) return false;
// report
channel->reportConsumerStarted(consumerTag());
// done
return true;
}

View File

@ -144,17 +144,8 @@ public:
*/
virtual bool process(ConnectionImpl *connection) override
{
// we need the appropriate channel
ChannelImpl *channel = connection->channel(this->channel());
// channel does not exist
if (!channel) return false;
// construct the message
channel->message(*this);
// done
return true;
// we no longer support returned messages
return false;
}
};

View File

@ -48,10 +48,9 @@ namespace AMQP {
* @param connection
* @param handler
*/
ChannelImpl::ChannelImpl(Channel *parent, Connection *connection, ChannelHandler *handler) :
ChannelImpl::ChannelImpl(Channel *parent, Connection *connection) :
_parent(parent),
_connection(&connection->_implementation),
_handler(handler)
_connection(&connection->_implementation)
{
// add the channel to the connection
_id = _connection->add(this);
@ -61,9 +60,6 @@ ChannelImpl::ChannelImpl(Channel *parent, Connection *connection, ChannelHandler
{
// this is invalid
_state = state_closed;
// invalid id, this channel can not exist
handler->onError(_parent, "Max number of channels reached");
}
else
{
@ -352,19 +348,13 @@ Deferred<uint32_t>& ChannelImpl::removeQueue(const std::string &name, int flags)
/**
* Publish a message to an exchange
*
* The following flags can be used
*
* - mandatory if set, an unroutable message will be reported to the channel handler with the onReturned method
* - immediate if set, a message that could not immediately be consumed is returned to the onReturned method
*
* @param exchange the exchange to publish to
* @param routingkey the routing key
* @param flags optional flags (see above)
* @param envelope the full envelope to send
* @param message the message to send
* @param size size of the message
*/
bool ChannelImpl::publish(const std::string &exchange, const std::string &routingKey, int flags, const Envelope &envelope)
bool ChannelImpl::publish(const std::string &exchange, const std::string &routingKey, const Envelope &envelope)
{
// we are going to send out multiple frames, each one will trigger a call to the handler,
// which in turn could destruct the channel object, we need to monitor that
@ -373,7 +363,7 @@ bool ChannelImpl::publish(const std::string &exchange, const std::string &routin
// @todo do not copy the entire buffer to individual frames
// send the publish frame
if (!send(BasicPublishFrame(_id, exchange, routingKey, flags & mandatory, flags & immediate))) return false;
if (!send(BasicPublishFrame(_id, exchange, routingKey))) return false;
// channel still valid?
if (!monitor.valid()) return false;
@ -432,27 +422,65 @@ Deferred<>& ChannelImpl::setQos(uint16_t prefetchCount)
* @param tag a consumer tag that will be associated with this consume operation
* @param flags additional flags
* @param arguments additional arguments
* @return bool
*
* This function returns a deferred handler. Callbacks can be installed
* using onSuccess(), onError() and onFinalize() methods.
*
* The onSuccess() callback that you can install should have the following signature:
*
* void myCallback(AMQP::Channel *channel, const std::string& tag);
*
* For example: channel.declareQueue("myqueue").onSuccess([](AMQP::Channel *channel, const std::string& tag) {
*
* std::cout << "Started consuming under tag " << tag << std::endl;
*
* });
*/
bool ChannelImpl::consume(const std::string &queue, const std::string &tag, int flags, const Table &arguments)
DeferredConsumer& ChannelImpl::consume(const std::string &queue, const std::string &tag, int flags, const Table &arguments)
{
// send a consume frame
return send(BasicConsumeFrame(_id, queue, tag, flags & nolocal, flags & noack, flags & exclusive, flags & nowait, arguments));
// create the deferred consumer
_consumer = std::unique_ptr<DeferredConsumer>(new DeferredConsumer(_parent, false));
// can we send the basic consume frame?
if (!send(BasicConsumeFrame(_id, queue, tag, flags & nolocal, flags & noack, flags & exclusive, flags & nowait, arguments)))
{
// we set the consumer to be failed immediately
_consumer->_failed = true;
// we should call the error function later
// TODO
}
// return the consumer
return *_consumer;
}
/**
* Cancel a running consumer
* @param tag the consumer tag
* @param flags optional flags
*
* This function returns a deferred handler. Callbacks can be installed
* using onSuccess(), onError() and onFinalize() methods.
*
* The onSuccess() callback that you can install should have the following signature:
*
* void myCallback(AMQP::Channel *channel, const std::string& tag);
*
* For example: channel.declareQueue("myqueue").onSuccess([](AMQP::Channel *channel, const std::string& tag) {
*
* std::cout << "Started consuming under tag " << tag << std::endl;
*
* });
*/
bool ChannelImpl::cancel(const std::string &tag, int flags)
Deferred<const std::string&>& ChannelImpl::cancel(const std::string &tag, int flags)
{
// send a cancel frame
return send(BasicCancelFrame(_id, tag, flags & nowait));
return send<const std::string&>(BasicCancelFrame(_id, tag, flags & nowait), "Cannot send basic cancel frame");
}
/**
* Acknoledge a message
* Acknowledge a message
* @param deliveryTag the delivery tag
* @param flags optional flags
* @return bool
@ -541,14 +569,23 @@ void ChannelImpl::reportMessage()
// skip if there is no message
if (!_message) return;
// after the report the channel may be destructed, monitor that
Monitor monitor(this);
// do we even have a consumer?
if (!_consumer)
{
// this should not be possible: receiving a message without doing a consume() call
reportError("Received message without having a consumer");
}
else
{
// after the report the channel may be destructed, monitor that
Monitor monitor(this);
// do we have a handler?
if (_handler) _message->report(_parent, _handler);
// send message to the consumer
_message->report(*_consumer);
// skip if channel was destructed
if (!monitor.valid()) return;
// skip if channel was destructed
if (!monitor.valid()) return;
}
// no longer need the message
delete _message;
@ -569,20 +606,6 @@ MessageImpl *ChannelImpl::message(const BasicDeliverFrame &frame)
return _message = new ConsumedMessage(frame);
}
/**
* Create an incoming message
* @param frame
* @return MessageImpl
*/
MessageImpl *ChannelImpl::message(const BasicReturnFrame &frame)
{
// it should not be possible that a message already exists, but lets check it anyhow
if (_message) delete _message;
// construct a message
return _message = new ReturnedMessage(frame);
}
/**
* End of namespace
*/

View File

@ -26,7 +26,7 @@ private:
* @var uint64_t
*/
uint64_t _deliveryTag;
/**
* Is this a redelivered message?
* @var bool
@ -39,25 +39,24 @@ public:
* Constructor
* @param frame
*/
ConsumedMessage(const BasicDeliverFrame &frame) :
MessageImpl(frame.exchange(), frame.routingKey()),
ConsumedMessage(const BasicDeliverFrame &frame) :
MessageImpl(frame.exchange(), frame.routingKey()),
_consumerTag(frame.consumerTag()), _deliveryTag(frame.deliveryTag()), _redelivered(frame.redelivered())
{}
/**
* Destructor
*/
virtual ~ConsumedMessage() {}
/**
* Report to the handler
* @param channel
* @param handler
* @param consumer
*/
virtual void report(Channel *channel, ChannelHandler *handler) override
virtual void report(const DeferredConsumer& consumer) override
{
// report to the handler
handler->onReceived(channel, *this, _deliveryTag, _consumerTag, _redelivered);
// send ourselves to the consumer
consumer.message(*this, _deliveryTag, _consumerTag, _redelivered);
}
};

View File

@ -20,10 +20,10 @@ class MessageImpl : public Message
private:
/**
* How many bytes have been received?
* @var uint64_t
* @var uint64_t
*/
uint64_t _received;
/**
* Was the buffer allocated by us?
* @var bool
@ -36,8 +36,8 @@ protected:
* @param exchange
* @param routingKey
*/
MessageImpl(const std::string &exchange, const std::string &routingKey) :
Message(exchange, routingKey),
MessageImpl(const std::string &exchange, const std::string &routingKey) :
Message(exchange, routingKey),
_received(0), _selfAllocated(false)
{}
@ -45,12 +45,12 @@ public:
/**
* Destructor
*/
virtual ~MessageImpl()
virtual ~MessageImpl()
{
// clear up memory if it was self allocated
if (_selfAllocated) delete[] _body;
}
/**
* Set the body size
* This field is set when the header is received
@ -60,7 +60,7 @@ public:
{
_bodySize = size;
}
/**
* Append data
* @param buffer incoming data
@ -84,27 +84,26 @@ public:
// it does not yet fit, do we have to allocate?
if (!_body) _body = new char[_bodySize];
_selfAllocated = true;
// prevent that size is too big
if (size > _bodySize - _received) size = _bodySize - _received;
// append data
memcpy((char *)(_body + _received), buffer, size);
// we have more data now
_received += size;
// done
return _received >= _bodySize;
}
}
/**
* Report to the handler
* @param channel
* @param handler
* @param consumer
*/
virtual void report(Channel *channel, ChannelHandler *handler) = 0;
virtual void report(const DeferredConsumer& consumer) = 0;
};
/**

View File

@ -24,7 +24,7 @@ private:
* @var int16_t
*/
int16_t _replyCode;
/**
* The reply message
* @var string
@ -40,21 +40,19 @@ public:
ReturnedMessage(const BasicReturnFrame &frame) :
MessageImpl(frame.exchange(), frame.routingKey()),
_replyCode(frame.replyCode()), _replyText(frame.replyText()) {}
/**
* Destructor
*/
virtual ~ReturnedMessage() {}
/**
* Report to the handler
* @param channel
* @param handler
* @param consumer
*/
virtual void report(Channel *channel, ChannelHandler *handler) override
virtual void report(const DeferredConsumer& consumer) override
{
// report to the handler
handler->onReturned(channel, *this, _replyCode, _replyText);
// we no longer support returned messages
}
};

View File

@ -30,13 +30,13 @@ using namespace Copernica;
* @return int
*/
int main(int argc, const char *argv[])
{
{
// need an ip
if (argc != 2)
{
// report error
std::cerr << "usage: " << argv[0] << " <ip>" << std::endl;
// done
return -1;
}

View File

@ -3,13 +3,13 @@
*
* @copyright 2014 Copernica BV
*/
/**
* Required external libraries
*/
#include <amqpcpp.h>
#include <copernica/network.h>
#include <string>
/**
@ -17,7 +17,7 @@
*/
using namespace std;
using namespace Copernica;
/**
* Required local class definitions
*/
@ -33,23 +33,11 @@ MyConnection::MyConnection(const std::string &ip) :
{
// start connecting
if (_socket.connect(Network::Ipv4Address(ip), 5672)) return;
// failure
onFailure(&_socket);
}
/**
* Destructor
*/
MyConnection::~MyConnection()
{
// do we still have a channel?
if (_channel) delete _channel;
// do we still have a connection?
if (_connection) delete _connection;
}
/**
* Method that is called when the connection failed
* @param socket Pointer to the socket
@ -78,19 +66,118 @@ void MyConnection::onConnected(Network::TcpSocket *socket)
{
// report connection
std::cout << "connected" << std::endl;
// we are connected, leap out if there already is a amqp connection
if (_connection) return;
// create amqp connection, and a new channel
_connection = new AMQP::Connection(this, AMQP::Login("guest", "guest"), "/");
_channel = new AMQP::Channel(_connection, this);
// we declare a queue, an exchange and we publish a message
_channel->declareQueue("my_queue");
// _channel->declareQueue("my_queue", AMQP::autodelete);
_channel->declareExchange("my_exchange", AMQP::direct);
_channel->bindQueue("my_exchange", "my_queue", "key");
_connection = std::unique_ptr<AMQP::Connection>(new AMQP::Connection(this, AMQP::Login("guest", "guest"), "/"));
_channel = std::unique_ptr<AMQP::Channel>(new AMQP::Channel(_connection.get()));
// watch for the channel becoming ready
_channel->onReady([](AMQP::Channel *channel) {
// show that we are ready
std::cout << "AMQP channel ready, id: " << (int) channel->id() << std::endl;
});
// and of course for channel errors
_channel->onError([this](AMQP::Channel *channel, const std::string& message) {
// inform the user of the error
std::cerr << "AMQP channel error on channel " << channel->id() << ": " << message << std::endl;
// delete the channel
_channel = nullptr;
// close the connection
_connection->close();
});
// declare a queue and let us know when it succeeds
_channel->declareQueue("my_queue").onSuccess([](AMQP::Channel *channel, const std::string &name, uint32_t messageCount, uint32_t consumerCount){
// queue was successfully declared
std::cout << "AMQP Queue declared with name '" << name << "', " << messageCount << " messages and " << consumerCount << " consumer" << std::endl;
});
// also declare an exchange
_channel->declareExchange("my_exchange", AMQP::direct).onSuccess([](AMQP::Channel *channel) {
// exchange successfully declared
std::cout << "AMQP exchange declared" << std::endl;
});
// bind the queue to the exchange
_channel->bindQueue("my_exchange", "my_queue", "key").onSuccess([](AMQP::Channel *channel) {
// queue successfully bound to exchange
std::cout << "AMQP Queue bound" << std::endl;
});
// set quality of service
_channel->setQos(1).onSuccess([](AMQP::Channel *channel) {
// quality of service successfully set
std::cout << "AMQP Quality of Service set" << std::endl;
});
// publish a message to the exchange
if (!_channel->publish("my_exchange", "key", "my_message"))
{
// we could not publish the message, something is wrong somewhere
std::cerr << "Unable to publish message" << std::endl;
// close the channel
_channel->close().onSuccess([this](AMQP::Channel *channel) {
// also close the connection
_connection->close();
});
}
// consume the message we just published
_channel->consume("my_queue", "my_consumer", AMQP::exclusive)
.onReceived([this](AMQP::Channel *channel, const AMQP::Message &message, uint64_t deliveryTag, const std::string &consumerTag, bool redelivered) {
// show the message data
std::cout << "AMQP consumed: " << message.message() << std::endl;
// ack the message
_channel->ack(deliveryTag);
// and stop consuming (there is only one message anyways)
_channel->cancel("my_consumer").onSuccess([](AMQP::Channel *channel, const std::string& tag) {
// we successfully stopped consuming
std::cout << "Stopped consuming under tag " << tag << std::endl;
});
// unbind the queue again
_channel->unbindQueue("my_exchange", "my_queue", "key").onSuccess([](AMQP::Channel *channel) {
// queueu successfully unbound
std::cout << "Queue unbound" << std::endl;
});
// the queue should now be empty, so we can delete it
_channel->removeQueue("my_queue").onSuccess([](AMQP::Channel *channel, uint32_t messageCount) {
// queue was removed, it should have been empty, so messageCount should be 0
if (messageCount) std::cerr << "Removed queue which should have been empty but contained " << messageCount << " messages" << std::endl;
// no messages is the expected behavior
else std::cout << "Queue removed" << std::endl;
});
// also remove the exchange
_channel->removeExchange("my_exchange").onSuccess([](AMQP::Channel *channel) {
// exchange was successfully removed
std::cout << "Removed exchange" << std::endl;
});
// everything done, close the channel
_channel->close().onSuccess([this](AMQP::Channel *channel) {
// channel was closed
std::cout << "Channel closed" << std::endl;
// close the connection too
_connection->close();
});
})
.onSuccess([](AMQP::Channel *channel, const std::string& tag) {
// consumer was started
std::cout << "Started consuming under tag " << tag << std::endl;
});
}
/**
@ -103,12 +190,11 @@ void MyConnection::onClosed(Network::TcpSocket *socket)
std::cout << "myconnection closed" << std::endl;
// close the channel and connection
if (_channel) delete _channel;
if (_connection) delete _connection;
// set to null
_channel = nullptr;
_connection = nullptr;
// stop the loop
Event::MainLoop::instance()->stop();
}
/**
@ -119,14 +205,13 @@ void MyConnection::onLost(Network::TcpSocket *socket)
{
// report error
std::cout << "connection lost" << std::endl;
// close the channel and connection
if (_channel) delete _channel;
if (_connection) delete _connection;
// set to null
_channel = nullptr;
_connection = nullptr;
// stop the loop
Event::MainLoop::instance()->stop();
}
/**
@ -136,15 +221,12 @@ void MyConnection::onLost(Network::TcpSocket *socket)
*/
void MyConnection::onData(Network::TcpSocket *socket, Network::Buffer *buffer)
{
// send what came in
std::cout << "received: " << buffer->size() << " bytes" << std::endl;
// leap out if there is no connection
if (!_connection) return;
// let the data be handled by the connection
size_t bytes = _connection->parse(buffer->data(), buffer->size());
// shrink the buffer
buffer->shrink(bytes);
}
@ -152,7 +234,7 @@ void MyConnection::onData(Network::TcpSocket *socket, Network::Buffer *buffer)
/**
* Method that is called when data needs to be sent over the network
*
* Note that the AMQP library does no buffering by itself. This means
* Note that the AMQP library does no buffering by itself. This means
* that this method should always send out all data or do the buffering
* itself.
*
@ -162,21 +244,27 @@ void MyConnection::onData(Network::TcpSocket *socket, Network::Buffer *buffer)
*/
void MyConnection::onData(AMQP::Connection *connection, const char *buffer, size_t size)
{
// // report what is going on
// std::cout << "send: " << size << std::endl;
//
// for (unsigned i=0; i<size; i++) std::cout << (int)buffer[i] << " ";
// std::cout << std::endl;
// send to the socket
_socket.write(buffer, size);
}
/**
* Method that is called when the connection to AMQP was closed
* @param connection pointer to connection object
*/
void MyConnection::onClosed(AMQP::Connection *connection)
{
// report that AMQP connection is closed
std::cout << "AMQP connection closed" << std::endl;
// close the underlying socket
_socket.close();
}
/**
* When the connection ends up in an error state this method is called.
* This happens when data comes in that does not match the AMQP protocol
*
*
* After this method is called, the connection no longer is in a valid
* state and can be used. In normal circumstances this method is not called.
*
@ -199,265 +287,7 @@ void MyConnection::onConnected(AMQP::Connection *connection)
{
// show
std::cout << "AMQP login success" << std::endl;
// create channel if it does not yet exist
if (!_channel) _channel = new AMQP::Channel(connection, this);
if (!_channel) _channel = std::unique_ptr<AMQP::Channel>(new AMQP::Channel(connection));
}
/**
* Method that is called when the channel was succesfully created.
* Only after the channel was created, you can use it for subsequent messages over it
* @param channel
*/
void MyConnection::onReady(AMQP::Channel *channel)
{
// show
std::cout << "AMQP channel ready, id: " << (int) channel->id() << std::endl;
}
/**
* An error has occured on the channel
* @param channel
* @param message
*/
void MyConnection::onError(AMQP::Channel *channel, const std::string &message)
{
// show
std::cout << "AMQP channel error, id: " << (int) channel->id() << " - message: " << message << std::endl;
// main channel cause an error, get rid of if
delete _channel;
// reset pointer
_channel = nullptr;
}
/**
* Method that is called when the channel was paused
* @param channel
*/
void MyConnection::onPaused(AMQP::Channel *channel)
{
// show
std::cout << "AMQP channel paused" << std::endl;
}
/**
* Method that is called when the channel was resumed
* @param channel
*/
void MyConnection::onResumed(AMQP::Channel *channel)
{
// show
std::cout << "AMQP channel resumed" << std::endl;
}
/**
* Method that is called when a channel is closed
* @param channel
*/
void MyConnection::onClosed(AMQP::Channel *channel)
{
// show
std::cout << "AMQP channel closed" << std::endl;
}
/**
* Method that is called when a transaction was started
* @param channel
*/
void MyConnection::onTransactionStarted(AMQP::Channel *channel)
{
// show
std::cout << "AMQP transaction started" << std::endl;
}
/**
* Method that is called when a transaction was committed
* @param channel
*/
void MyConnection::onTransactionCommitted(AMQP::Channel *channel)
{
// show
std::cout << "AMQP transaction committed" << std::endl;
}
/**
* Method that is called when a transaction was rolled back
* @param channel
*/
void MyConnection::onTransactionRolledBack(AMQP::Channel *channel)
{
// show
std::cout << "AMQP transaction rolled back" << std::endl;
}
/**
* Mehod that is called when an exchange is declared
* @param channel
*/
void MyConnection::onExchangeDeclared(AMQP::Channel *channel)
{
// show
std::cout << "AMQP exchange declared" << std::endl;
}
/**
* Method that is called when an exchange is bound
* @param channel
*/
void MyConnection::onExchangeBound(AMQP::Channel *channel)
{
// show
std::cout << "AMQP Exchange bound" << std::endl;
}
/**
* Method that is called when an exchange is unbound
* @param channel
*/
void MyConnection::onExchangeUnbound(AMQP::Channel *channel)
{
// show
std::cout << "AMQP Exchange unbound" << std::endl;
}
/**
* Method that is called when an exchange is deleted
* @param channel
*/
void MyConnection::onExchangeDeleted(AMQP::Channel *channel)
{
// show
std::cout << "AMQP Exchange deleted" << std::endl;
}
/**
* Method that is called when a queue is declared
* @param channel
* @param name name of the queue
* @param messageCount number of messages in queue
* @param consumerCount number of active consumers
*/
void MyConnection::onQueueDeclared(AMQP::Channel *channel, const std::string &name, uint32_t messageCount, uint32_t consumerCount)
{
// show
std::cout << "AMQP Queue declared" << std::endl;
}
/**
* Method that is called when a queue is bound
* @param channel
* @param
*/
void MyConnection::onQueueBound(AMQP::Channel *channel)
{
// show
std::cout << "AMQP Queue bound" << std::endl;
// _connection->setQos(10);
// _channel->setQos(1);
_channel->publish("my_exchange", "invalid-key", AMQP::mandatory, "this is the message");
// _channel->consume("my_queue");
}
/**
* Method that is called when a queue is deleted
* @param channel
* @param messageCount number of messages deleted along with the queue
*/
void MyConnection::onQueueDeleted(AMQP::Channel *channel, uint32_t messageCount)
{
// show
std::cout << "AMQP Queue deleted" << std::endl;
}
/**
* Method that is called when a queue is unbound
* @param channel
*/
void MyConnection::onQueueUnbound(AMQP::Channel *channel)
{
// show
std::cout << "AMQP Queue unbound" << std::endl;
}
/**
* Method that is called when a queue is purged
* @param messageCount number of message purged
*/
void MyConnection::onQueuePurged(AMQP::Channel *channel, uint32_t messageCount)
{
// show
std::cout << "AMQP Queue purged" << std::endl;
}
/**
* Method that is called when the quality-of-service was changed
* This is the result of a call to Channel::setQos()
*/
void MyConnection::onQosSet(AMQP::Channel *channel)
{
// show
std::cout << "AMQP Qos set" << std::endl;
}
/**
* Method that is called when a consumer was started
* This is the result of a call to Channel::consume()
* @param channel the channel on which the consumer was started
* @param tag the consumer tag
*/
void MyConnection::onConsumerStarted(AMQP::Channel *channel, const std::string &tag)
{
// show
std::cout << "AMQP consumer started" << std::endl;
}
/**
* Method that is called when a message has been received on a channel
* @param channel the channel on which the consumer was started
* @param message the consumed message
* @param deliveryTag the delivery tag, you need this to acknowledge the message
* @param consumerTag the consumer identifier that was used to retrieve this message
* @param redelivered is this a redelivered message?
*/
void MyConnection::onReceived(AMQP::Channel *channel, const AMQP::Message &message, uint64_t deliveryTag, const std::string &consumerTag, bool redelivered)
{
// show
std::cout << "AMQP consumed: " << message.message() << std::endl;
// ack the message
channel->ack(deliveryTag);
}
/**
* Method that is called when a message you tried to publish was returned
* by the server. This only happens when the 'mandatory' or 'immediate' flag
* was set with the Channel::publish() call.
* @param channel the channel on which the message was returned
* @param message the returned message
* @param code the reply code
* @param text human readable reply reason
*/
void MyConnection::onReturned(AMQP::Channel *channel, const AMQP::Message &message, int16_t code, const std::string &text)
{
// show
std::cout << "AMQP message returned: " << text << std::endl;
}
/**
* Method that is called when a consumer was stopped
* This is the result of a call to Channel::cancel()
* @param channel the channel on which the consumer was stopped
* @param tag the consumer tag
*/
void MyConnection::onConsumerStopped(AMQP::Channel *channel, const std::string &tag)
{
// show
std::cout << "AMQP consumer stopped" << std::endl;
}

View File

@ -9,9 +9,8 @@
/**
* Class definition
*/
class MyConnection :
class MyConnection :
public AMQP::ConnectionHandler,
public AMQP::ChannelHandler,
public Network::TcpHandler
{
private:
@ -20,18 +19,18 @@ private:
* @var TcpSocket
*/
Network::TcpSocket _socket;
/**
* The AMQP connection
* @var Connection
*/
AMQP::Connection *_connection;
std::unique_ptr<AMQP::Connection> _connection;
/**
* The AMQP channel
* @var Channel
*/
AMQP::Channel *_channel;
std::unique_ptr<AMQP::Channel> _channel;
/**
* Method that is called when the connection failed
@ -44,13 +43,13 @@ private:
* @param socket Pointer to the socket
*/
virtual void onTimeout(Network::TcpSocket *socket) override;
/**
* Method that is called when the connection succeeded
* @param socket Pointer to the socket
*/
virtual void onConnected(Network::TcpSocket *socket) override;
/**
* Method that is called when the socket is closed (as a result of a TcpSocket::close() call)
* @param socket Pointer to the socket
@ -62,18 +61,18 @@ private:
* @param socket Pointer to the socket
*/
virtual void onLost(Network::TcpSocket *socket) override;
/**
* Method that is called when data is received on the socket
* @param socket Pointer to the socket
* @param buffer Pointer to the fill input buffer
*/
virtual void onData(Network::TcpSocket *socket, Network::Buffer *buffer) override;
/**
* Method that is called when data needs to be sent over the network
*
* Note that the AMQP library does no buffering by itself. This means
* Note that the AMQP library does no buffering by itself. This means
* that this method should always send out all data or do the buffering
* itself.
*
@ -82,11 +81,17 @@ private:
* @param size Size of the buffer
*/
virtual void onData(AMQP::Connection *connection, const char *buffer, size_t size) override;
/**
* Method that is called when the connection to AMQP was closed
* @param connection pointer to connection object
*/
virtual void onClosed(AMQP::Connection *connection) override;
/**
* When the connection ends up in an error state this method is called.
* This happens when data comes in that does not match the AMQP protocol
*
*
* After this method is called, the connection no longer is in a valid
* state and can be used. In normal circumstances this method is not called.
*
@ -103,175 +108,11 @@ private:
*/
virtual void onConnected(AMQP::Connection *connection) override;
/**
* Method that is called when the channel was succesfully created.
* Only after the channel was created, you can use it for subsequent messages over it
* @param channel
*/
virtual void onReady(AMQP::Channel *channel) override;
/**
* An error has occured on the channel
* @param channel
* @param message
*/
virtual void onError(AMQP::Channel *channel, const std::string &message) override;
/**
* Method that is called when the channel was paused
* @param channel
*/
virtual void onPaused(AMQP::Channel *channel) override;
/**
* Method that is called when the channel was resumed
* @param channel
*/
virtual void onResumed(AMQP::Channel *channel) override;
/**
* Method that is called when a channel is closed
* @param channel
*/
virtual void onClosed(AMQP::Channel *channel) override;
/**
* Method that is called when a transaction was started
* @param channel
*/
virtual void onTransactionStarted(AMQP::Channel *channel) override;
/**
* Method that is called when a transaction was committed
* @param channel
*/
virtual void onTransactionCommitted(AMQP::Channel *channel) override;
/**
* Method that is called when a transaction was rolled back
* @param channel
*/
virtual void onTransactionRolledBack(AMQP::Channel *channel) override;
/**
* Method that is called when an exchange is bound
* @param channel
*/
virtual void onExchangeBound(AMQP::Channel *channel) override;
/**
* Method that is called when an exchange is unbound
* @param channel
*/
virtual void onExchangeUnbound(AMQP::Channel *channel) override;
/**
* Method that is called when an exchange is deleted
* @param channel
*/
virtual void onExchangeDeleted(AMQP::Channel *channel) override;
/**
* Mehod that is called when an exchange is declared
* @param channel
*/
virtual void onExchangeDeclared(AMQP::Channel *channel) override;
/**
* Method that is called when a queue is declared
* @param channel
* @param name name of the queue
* @param messageCount number of messages in queue
* @param consumerCount number of active consumers
*/
virtual void onQueueDeclared(AMQP::Channel *channel, const std::string &name, uint32_t messageCount, uint32_t consumerCount) override;
/**
* Method that is called when a queue is bound
* @param channel
* @param
*/
virtual void onQueueBound(AMQP::Channel *channel) override;
/**
* Method that is called when a queue is deleted
* @param channel
* @param messageCount number of messages deleted along with the queue
*/
virtual void onQueueDeleted(AMQP::Channel *channel, uint32_t messageCount) override;
/**
* Method that is called when a queue is unbound
* @param channel
*/
virtual void onQueueUnbound(AMQP::Channel *channel) override;
/**
* Method that is called when a queue is purged
* @param messageCount number of message purged
*/
virtual void onQueuePurged(AMQP::Channel *channel, uint32_t messageCount) override;
/**
* Method that is called when the quality-of-service was changed
* This is the result of a call to Channel::setQos()
*/
virtual void onQosSet(AMQP::Channel *channel) override;
/**
* Method that is called when a consumer was started
* This is the result of a call to Channel::consume()
* @param channel the channel on which the consumer was started
* @param tag the consumer tag
*/
virtual void onConsumerStarted(AMQP::Channel *channel, const std::string &tag) override;
/**
* Method that is called when a consumer was stopped
* This is the result of a call to Channel::cancel()
* @param channel the channel on which the consumer was stopped
* @param tag the consumer tag
*/
virtual void onConsumerStopped(AMQP::Channel *channel, const std::string &tag) override;
/**
* Method that is called when a message has been received on a channel
* This message will be called for every message that is received after
* you started consuming. Make sure you acknowledge the messages when its
* safe to remove them from RabbitMQ (unless you set no-ack option when you
* started the consumer)
* @param channel the channel on which the consumer was started
* @param message the consumed message
* @param deliveryTag the delivery tag, you need this to acknowledge the message
* @param consumerTag the consumer identifier that was used to retrieve this message
* @param redelivered is this a redelivered message?
*/
virtual void onReceived(AMQP::Channel *channel, const AMQP::Message &message, uint64_t deliveryTag, const std::string &consumerTag, bool redelivered) override;
/**
* Method that is called when a message you tried to publish was returned
* by the server. This only happens when the 'mandatory' or 'immediate' flag
* was set with the Channel::publish() call.
* @param channel the channel on which the message was returned
* @param message the returned message
* @param code the reply code
* @param text human readable reply reason
*/
virtual void onReturned(AMQP::Channel *channel, const AMQP::Message &message, int16_t code, const std::string &text) override;
public:
/**
* Constructor
* @param ip
*/
MyConnection(const std::string &ip);
/**
* Destructor
*/
virtual ~MyConnection();
};