AMQP-CPP/src/channelimpl.cpp

695 lines
19 KiB
C++
Raw Normal View History

/**
* Channel.cpp
*
* Implementation for a channel
*
* @copyright 2014 Copernica BV
*/
#include "includes.h"
#include "basicdeliverframe.h"
2014-01-06 22:49:31 +08:00
#include "basicreturnframe.h"
#include "messageimpl.h"
2014-01-06 22:49:31 +08:00
#include "consumedmessage.h"
#include "returnedmessage.h"
#include "channelopenframe.h"
#include "channelflowframe.h"
#include "channelcloseokframe.h"
#include "channelcloseframe.h"
#include "transactionselectframe.h"
#include "transactioncommitframe.h"
#include "transactionrollbackframe.h"
#include "exchangedeclareframe.h"
#include "exchangedeleteframe.h"
#include "exchangebindframe.h"
#include "exchangeunbindframe.h"
#include "queuedeclareframe.h"
#include "queuebindframe.h"
#include "queueunbindframe.h"
#include "queuepurgeframe.h"
#include "queuedeleteframe.h"
#include "basicpublishframe.h"
#include "basicheaderframe.h"
#include "bodyframe.h"
#include "basicqosframe.h"
#include "basicconsumeframe.h"
#include "basiccancelframe.h"
2014-01-06 04:21:09 +08:00
#include "basicackframe.h"
2014-01-06 21:28:58 +08:00
#include "basicnackframe.h"
2014-01-06 21:38:48 +08:00
#include "basicrecoverframe.h"
#include "basicrejectframe.h"
/**
* Set up namespace
*/
namespace AMQP {
/**
* Construct a channel object
* @param parent
* @param connection
* @param handler
*/
ChannelImpl::ChannelImpl(Channel *parent, Connection *connection) :
_parent(parent),
_connection(&connection->_implementation)
{
// add the channel to the connection
_id = _connection->add(this);
// check if the id is valid
if (_id == 0)
{
// this is invalid
_state = state_closed;
}
else
{
// busy connecting
_state = state_connected;
// valid id, send a channel open frame
send(ChannelOpenFrame(_id));
}
}
/**
* Destructor
*/
ChannelImpl::~ChannelImpl()
{
// remove incoming message
if (_message) delete _message;
_message = nullptr;
// remove this channel from the connection (but not if the connection is already destructed)
if (_connection) _connection->remove(this);
// close the channel now
close();
// destruct deferred results
while (_oldestCallback) _oldestCallback.reset(_oldestCallback->next());
}
/**
* Push a deferred result
* @param result The deferred object to push
*/
2014-04-15 19:01:27 +08:00
Deferred &ChannelImpl::push(Deferred *deferred)
{
// do we already have an oldest?
if (!_oldestCallback) _oldestCallback.reset(deferred);
// do we already have a newest?
if (_newestCallback) _newestCallback->add(deferred);
// store newest callback
_newestCallback = deferred;
2014-04-15 19:01:27 +08:00
// done
return *deferred;
}
/**
* Send a frame and push a deferred result
* @param frame The frame to send
*/
Deferred &ChannelImpl::push(const Frame &frame)
{
// send the frame, and push the result
return push(new Deferred(send(frame)));
}
/**
* Pause deliveries on a channel
*
* This will stop all incoming messages
*
* This function returns a deferred handler. Callbacks can be installed
* using onSuccess(), onError() and onFinalize() methods.
*/
Deferred &ChannelImpl::pause()
{
// send a channel flow frame
2014-04-15 19:01:27 +08:00
return push(ChannelFlowFrame(_id, false));
}
/**
* Resume a paused channel
*
* This will resume incoming messages
*
* This function returns a deferred handler. Callbacks can be installed
* using onSuccess(), onError() and onFinalize() methods.
*/
Deferred &ChannelImpl::resume()
{
// send a channel flow frame
2014-04-15 19:01:27 +08:00
return push(ChannelFlowFrame(_id, true));
}
/**
* Start a transaction
*
* This function returns a deferred handler. Callbacks can be installed
* using onSuccess(), onError() and onFinalize() methods.
*/
Deferred &ChannelImpl::startTransaction()
{
// send a transaction frame
2014-04-15 19:01:27 +08:00
return push(TransactionSelectFrame(_id));
}
/**
* Commit the current transaction
*
* This function returns a deferred handler. Callbacks can be installed
* using onSuccess(), onError() and onFinalize() methods.
*/
Deferred &ChannelImpl::commitTransaction()
{
// send a transaction frame
2014-04-15 19:01:27 +08:00
return push(TransactionCommitFrame(_id));
}
/**
* Rollback the current transaction
*
* This function returns a deferred handler. Callbacks can be installed
* using onSuccess(), onError() and onFinalize() methods.
*/
Deferred &ChannelImpl::rollbackTransaction()
{
// send a transaction frame
2014-04-15 19:01:27 +08:00
return push(TransactionRollbackFrame(_id));
}
/**
* Close the current channel
*
* This function returns a deferred handler. Callbacks can be installed
* using onSuccess(), onError() and onFinalize() methods.
*/
Deferred &ChannelImpl::close()
{
// send a channel close frame
2014-04-15 19:01:27 +08:00
auto &handler = push(ChannelCloseFrame(_id));
// was the frame sent and are we still alive?
2014-04-15 19:01:27 +08:00
if (handler) _state = state_closing;
// done
return handler;
}
/**
* declare an exchange
* @param name name of the exchange to declare
* @param type type of exchange
* @param flags additional settings for the exchange
* @param arguments additional arguments
*
* This function returns a deferred handler. Callbacks can be installed
* using onSuccess(), onError() and onFinalize() methods.
*/
Deferred &ChannelImpl::declareExchange(const std::string &name, ExchangeType type, int flags, const Table &arguments)
{
// convert exchange type
std::string exchangeType;
if (type == ExchangeType::fanout) exchangeType = "fanout";
if (type == ExchangeType::direct) exchangeType = "direct";
if (type == ExchangeType::topic) exchangeType = "topic";
if (type == ExchangeType::headers)exchangeType = "headers";
// send declare exchange frame
return push(ExchangeDeclareFrame(_id, name, exchangeType, flags & passive, flags & durable, false, arguments));
}
/**
* bind an exchange
*
* @param source exchange which binds to target
* @param target exchange to bind to
* @param routingKey routing key
* @param arguments additional arguments for binding
*
* This function returns a deferred handler. Callbacks can be installed
* using onSuccess(), onError() and onFinalize() methods.
*/
Deferred &ChannelImpl::bindExchange(const std::string &source, const std::string &target, const std::string &routingkey, const Table &arguments)
{
// send exchange bind frame
return push(ExchangeBindFrame(_id, target, source, routingkey, false, arguments));
}
/**
* unbind two exchanges
*
* @param source the source exchange
* @param target the target exchange
* @param routingkey the routing key
* @param arguments additional unbind arguments
*
* This function returns a deferred handler. Callbacks can be installed
* using onSuccess(), onError() and onFinalize() methods.
*/
Deferred &ChannelImpl::unbindExchange(const std::string &source, const std::string &target, const std::string &routingkey, const Table &arguments)
{
// send exchange unbind frame
return push(ExchangeUnbindFrame(_id, target, source, routingkey, false, arguments));
}
/**
* remove an exchange
*
* @param name name of the exchange to remove
* @param flags additional settings for deleting the exchange
*
* This function returns a deferred handler. Callbacks can be installed
* using onSuccess(), onError() and onFinalize() methods.
*/
Deferred &ChannelImpl::removeExchange(const std::string &name, int flags)
{
// send delete exchange frame
return push(ExchangeDeleteFrame(_id, name, flags & ifunused, false));
}
/**
* declare a queue
* @param name queue name
* @param flags additional settings for the queue
* @param arguments additional arguments
*
* This function returns a deferred handler. Callbacks can be installed
* using onSuccess(), onError() and onFinalize() methods.
*/
DeferredQueue &ChannelImpl::declareQueue(const std::string &name, int flags, const Table &arguments)
{
// the frame to send
QueueDeclareFrame frame(_id, name, flags & passive, flags & durable, flags & exclusive, flags & autodelete, false, arguments);
// send the queuedeclareframe
auto *result = new DeferredQueue(send(frame));
// add the deferred result
2014-04-15 19:01:27 +08:00
push(result);
// done
return *result;
}
/**
* Bind a queue to an exchange
*
* @param exchangeName name of the exchange to bind to
* @param queueName name of the queue
* @param routingkey routingkey
* @param arguments additional arguments
*
* This function returns a deferred handler. Callbacks can be installed
* using onSuccess(), onError() and onFinalize() methods.
*/
Deferred &ChannelImpl::bindQueue(const std::string &exchangeName, const std::string &queueName, const std::string &routingkey, const Table &arguments)
{
// send the bind queue frame
return push(QueueBindFrame(_id, queueName, exchangeName, routingkey, false, arguments));
}
/**
* Unbind a queue from an exchange
*
* @param exchange the source exchange
* @param queue the target queue
* @param routingkey the routing key
* @param arguments additional bind arguments
*
* This function returns a deferred handler. Callbacks can be installed
* using onSuccess(), onError() and onFinalize() methods.
*/
Deferred &ChannelImpl::unbindQueue(const std::string &exchange, const std::string &queue, const std::string &routingkey, const Table &arguments)
{
// send the unbind queue frame
2014-04-15 19:01:27 +08:00
return push(QueueUnbindFrame(_id, queue, exchange, routingkey, arguments));
}
/**
* Purge a queue
* @param queue queue to purge
*
* This function returns a deferred handler. Callbacks can be installed
* using onSuccess(), onError() and onFinalize() methods.
*
* The onSuccess() callback that you can install should have the following signature:
*
* void myCallback(AMQP::Channel *channel, uint32_t messageCount);
*
* For example: channel.declareQueue("myqueue").onSuccess([](AMQP::Channel *channel, uint32_t messageCount) {
*
* std::cout << "Queue purged, all " << messageCount << " messages removed" << std::endl;
*
* });
*/
DeferredDelete &ChannelImpl::purgeQueue(const std::string &name)
{
// the frame to send
QueuePurgeFrame frame(_id, name, false);
// send the frame, and create deferred object
auto *deferred = new DeferredDelete(send(frame));
// push to list
2014-04-15 19:01:27 +08:00
push(deferred);
// done
return *deferred;
}
/**
* Remove a queue
* @param queue queue to remove
* @param flags additional flags
*
* This function returns a deferred handler. Callbacks can be installed
* using onSuccess(), onError() and onFinalize() methods.
*
* The onSuccess() callback that you can install should have the following signature:
*
* void myCallback(AMQP::Channel *channel, uint32_t messageCount);
*
* For example: channel.declareQueue("myqueue").onSuccess([](AMQP::Channel *channel, uint32_t messageCount) {
*
* std::cout << "Queue deleted, along with " << messageCount << " messages" << std::endl;
*
* });
*/
DeferredDelete &ChannelImpl::removeQueue(const std::string &name, int flags)
{
// the frame to send
QueueDeleteFrame frame(_id, name, flags & ifunused, flags & ifempty, false);
// send the frame, and create deferred object
auto *deferred = new DeferredDelete(send(frame));
// push to list
2014-04-15 19:01:27 +08:00
push(deferred);
// done
return *deferred;
}
/**
* Publish a message to an exchange
*
* @param exchange the exchange to publish to
* @param routingkey the routing key
* @param envelope the full envelope to send
* @param message the message to send
* @param size size of the message
*/
bool ChannelImpl::publish(const std::string &exchange, const std::string &routingKey, const Envelope &envelope)
{
// we are going to send out multiple frames, each one will trigger a call to the handler,
// which in turn could destruct the channel object, we need to monitor that
Monitor monitor(this);
// @todo do not copy the entire buffer to individual frames
// send the publish frame
if (!send(BasicPublishFrame(_id, exchange, routingKey))) return false;
// channel still valid?
if (!monitor.valid()) return false;
// send header
if (!send(BasicHeaderFrame(_id, envelope))) return false;
// channel and connection still valid?
if (!monitor.valid() || !_connection) return false;
// the max payload size is the max frame size minus the bytes for headers and trailer
uint32_t maxpayload = _connection->maxPayload();
uint32_t bytessent = 0;
// the buffer
const char *data = envelope.body();
uint32_t bytesleft = envelope.bodySize();
// split up the body in multiple frames depending on the max frame size
while (bytesleft > 0)
{
// size of this chunk
uint32_t chunksize = std::min(maxpayload, bytesleft);
// send out a body frame
if (!send(BodyFrame(_id, data + bytessent, chunksize))) return false;
// channel still valid?
if (!monitor.valid()) return false;
// update counters
bytessent += chunksize;
bytesleft -= chunksize;
}
// done
return true;
}
/**
* Set the Quality of Service (QOS) for this channel
* @param prefetchCount maximum number of messages to prefetch
*
* This function returns a deferred handler. Callbacks can be installed
* using onSuccess(), onError() and onFinalize() methods.
*
* @param prefetchCount number of messages to fetch
* @param global share counter between all consumers on the same channel
*/
Deferred &ChannelImpl::setQos(uint16_t prefetchCount, bool global)
{
// send a qos frame
return push(BasicQosFrame(_id, prefetchCount, global));
}
/**
* Tell the RabbitMQ server that we're ready to consume messages
* @param queue the queue from which you want to consume
* @param tag a consumer tag that will be associated with this consume operation
* @param flags additional flags
* @param arguments additional arguments
*
* This function returns a deferred handler. Callbacks can be installed
* using onSuccess(), onError() and onFinalize() methods.
*
* The onSuccess() callback that you can install should have the following signature:
*
* void myCallback(AMQP::Channel *channel, const std::string& tag);
*
* For example: channel.declareQueue("myqueue").onSuccess([](AMQP::Channel *channel, const std::string& tag) {
*
* std::cout << "Started consuming under tag " << tag << std::endl;
*
* });
*/
DeferredConsumer& ChannelImpl::consume(const std::string &queue, const std::string &tag, int flags, const Table &arguments)
{
// the frame to send
BasicConsumeFrame frame(_id, queue, tag, flags & nolocal, flags & noack, flags & exclusive, false, arguments);
// send the frame, and create deferred object
auto *deferred = new DeferredConsumer(this, send(frame));
// push to list
2014-04-15 19:01:27 +08:00
push(deferred);
// done
return *deferred;
}
/**
* Cancel a running consumer
* @param tag the consumer tag
*
* This function returns a deferred handler. Callbacks can be installed
* using onSuccess(), onError() and onFinalize() methods.
*
* The onSuccess() callback that you can install should have the following signature:
*
* void myCallback(AMQP::Channel *channel, const std::string& tag);
*
* For example: channel.declareQueue("myqueue").onSuccess([](AMQP::Channel *channel, const std::string& tag) {
*
* std::cout << "Started consuming under tag " << tag << std::endl;
*
* });
*/
DeferredCancel &ChannelImpl::cancel(const std::string &tag)
{
// the cancel frame to send
BasicCancelFrame frame(_id, tag, false);
// send the frame, and create deferred object
auto *deferred = new DeferredCancel(this, send(frame));
// push to list
2014-04-15 19:01:27 +08:00
push(deferred);
// done
return *deferred;
}
2014-01-06 04:21:09 +08:00
/**
* Acknowledge a message
2014-01-06 04:21:09 +08:00
* @param deliveryTag the delivery tag
* @param flags optional flags
* @return bool
*/
bool ChannelImpl::ack(uint64_t deliveryTag, int flags)
{
// send an ack frame
return send(BasicAckFrame(_id, deliveryTag, flags & multiple));
2014-01-06 04:21:09 +08:00
}
2014-01-06 21:28:58 +08:00
/**
* Reject a message
* @param deliveryTag the delivery tag
* @param flags optional flags
* @return bool
*/
bool ChannelImpl::reject(uint64_t deliveryTag, int flags)
{
// should we reject multiple messages?
if (flags & multiple)
{
// send a nack frame
return send(BasicNackFrame(_id, deliveryTag, true, flags & requeue));
}
else
{
// send a reject frame
return send(BasicRejectFrame(_id, deliveryTag, flags & requeue));
}
2014-01-06 21:28:58 +08:00
}
2014-01-06 21:38:48 +08:00
/**
* Recover un-acked messages
* @param flags optional flags
*
* This function returns a deferred handler. Callbacks can be installed
* using onSuccess(), onError() and onFinalize() methods.
2014-01-06 21:38:48 +08:00
*/
Deferred &ChannelImpl::recover(int flags)
2014-01-06 21:38:48 +08:00
{
// send a nack frame
2014-04-15 19:01:27 +08:00
return push(BasicRecoverFrame(_id, flags & requeue));
2014-01-06 21:38:48 +08:00
}
/**
* Send a frame over the channel
* @param frame frame to send
* @return bool was the frame sent?
*/
bool ChannelImpl::send(const Frame &frame)
{
// skip if channel is not connected
if (_state != state_connected || !_connection) return false;
// are we currently in synchronous mode or are there
// other frames waiting for their turn to be sent?
if (_synchronous || !_queue.empty())
{
// we need to wait until the synchronous frame has
// been processed, so queue the frame until it was
_queue.emplace(frame.synchronous(), frame.buffer());
// it was of course not actually sent but we pretend
// that it was, because no error occured
return true;
}
// enter synchronous mode if necessary
_synchronous = frame.synchronous();
// send to tcp connection
return _connection->send(frame);
}
/**
* Signal the channel that a synchronous operation
* was completed. After this operation, waiting
* frames can be sent out.
*/
void ChannelImpl::synchronized()
{
// we are no longer waiting for synchronous operations
_synchronous = false;
// we need to monitor the channel for validity
Monitor monitor(this);
// send all frames while not in synchronous mode
while (monitor.valid() && !_synchronous && !_queue.empty())
{
// retrieve the first buffer and synchronous
auto pair = std::move(_queue.front());
// remove from the list
_queue.pop();
// mark as synchronous if necessary
_synchronous = pair.first;
// send it over the connection
_connection->send(std::move(pair.second));
}
}
/**
2014-01-06 04:21:09 +08:00
* Report the received message
*/
2014-01-06 22:49:31 +08:00
void ChannelImpl::reportMessage()
{
2014-01-06 22:49:31 +08:00
// skip if there is no message
if (!_message) return;
// look for the consumer
auto iter = _consumers.find(_message->consumer());
if (iter == _consumers.end()) return;
// is this a valid callback method
if (!iter->second) return;
// after the report the channel may be destructed, monitor that
Monitor monitor(this);
// call the callback
_message->report(iter->second);
// skip if channel was destructed
if (!monitor.valid()) return;
2014-01-06 22:49:31 +08:00
// no longer need the message
delete _message; _message = nullptr;
}
/**
* Create an incoming message
* @param frame
* @return ConsumedMessage
*/
ConsumedMessage *ChannelImpl::message(const BasicDeliverFrame &frame)
{
// destruct if message is already set
if (_message) delete _message;
// construct a message
2014-01-06 22:49:31 +08:00
return _message = new ConsumedMessage(frame);
}
/**
* End of namespace
*/
}