2014-01-04 19:45:04 +08:00
|
|
|
/**
|
|
|
|
|
* Channel.cpp
|
|
|
|
|
*
|
|
|
|
|
* Implementation for a channel
|
|
|
|
|
*
|
2018-02-07 17:08:32 +08:00
|
|
|
* @copyright 2014 - 2018 Copernica BV
|
2014-01-04 19:45:04 +08:00
|
|
|
*/
|
|
|
|
|
#include "includes.h"
|
2014-07-31 18:58:13 +08:00
|
|
|
#include "basicgetokframe.h"
|
2014-01-06 22:49:31 +08:00
|
|
|
#include "basicreturnframe.h"
|
|
|
|
|
#include "consumedmessage.h"
|
|
|
|
|
#include "returnedmessage.h"
|
2014-01-04 19:45:04 +08:00
|
|
|
#include "channelopenframe.h"
|
|
|
|
|
#include "channelflowframe.h"
|
2014-01-06 01:50:41 +08:00
|
|
|
#include "channelcloseokframe.h"
|
2014-01-04 19:45:04 +08:00
|
|
|
#include "channelcloseframe.h"
|
2017-06-09 04:19:55 +08:00
|
|
|
#include "confirmselectframe.h"
|
2014-01-04 19:45:04 +08:00
|
|
|
#include "transactionselectframe.h"
|
|
|
|
|
#include "transactioncommitframe.h"
|
|
|
|
|
#include "transactionrollbackframe.h"
|
|
|
|
|
#include "exchangedeclareframe.h"
|
|
|
|
|
#include "exchangedeleteframe.h"
|
|
|
|
|
#include "exchangebindframe.h"
|
|
|
|
|
#include "exchangeunbindframe.h"
|
|
|
|
|
#include "queuedeclareframe.h"
|
|
|
|
|
#include "queuebindframe.h"
|
|
|
|
|
#include "queueunbindframe.h"
|
|
|
|
|
#include "queuepurgeframe.h"
|
|
|
|
|
#include "queuedeleteframe.h"
|
2014-01-05 20:08:35 +08:00
|
|
|
#include "basicpublishframe.h"
|
|
|
|
|
#include "basicheaderframe.h"
|
|
|
|
|
#include "bodyframe.h"
|
2014-01-05 21:19:35 +08:00
|
|
|
#include "basicqosframe.h"
|
2014-01-06 01:50:41 +08:00
|
|
|
#include "basicconsumeframe.h"
|
|
|
|
|
#include "basiccancelframe.h"
|
2014-01-06 04:21:09 +08:00
|
|
|
#include "basicackframe.h"
|
2014-01-06 21:28:58 +08:00
|
|
|
#include "basicnackframe.h"
|
2014-01-06 21:38:48 +08:00
|
|
|
#include "basicrecoverframe.h"
|
2014-05-27 00:17:49 +08:00
|
|
|
#include "basicrejectframe.h"
|
2014-07-31 18:58:13 +08:00
|
|
|
#include "basicgetframe.h"
|
|
|
|
|
|
2014-01-04 19:45:04 +08:00
|
|
|
/**
|
|
|
|
|
* Set up namespace
|
|
|
|
|
*/
|
|
|
|
|
namespace AMQP {
|
|
|
|
|
|
2016-06-23 20:42:50 +08:00
|
|
|
/**
|
|
|
|
|
* Constructor
|
|
|
|
|
*/
|
|
|
|
|
ChannelImpl::ChannelImpl() = default;
|
|
|
|
|
|
2014-08-20 17:47:16 +08:00
|
|
|
/**
|
2014-08-20 18:44:52 +08:00
|
|
|
* Destructor
|
2014-08-20 17:47:16 +08:00
|
|
|
*/
|
2014-08-20 18:44:52 +08:00
|
|
|
ChannelImpl::~ChannelImpl()
|
2014-08-20 17:47:16 +08:00
|
|
|
{
|
2014-08-20 18:44:52 +08:00
|
|
|
// remove this channel from the connection (but not if the connection is already destructed)
|
|
|
|
|
if (_connection) _connection->remove(this);
|
2014-08-20 17:47:16 +08:00
|
|
|
}
|
|
|
|
|
|
2017-03-03 19:56:54 +08:00
|
|
|
/**
|
|
|
|
|
* Callback that is called when an error occurs.
|
|
|
|
|
*
|
|
|
|
|
* Only one error callback can be registered. Calling this function
|
|
|
|
|
* multiple times will remove the old callback.
|
|
|
|
|
*
|
|
|
|
|
* @param callback the callback to execute
|
|
|
|
|
*/
|
|
|
|
|
void ChannelImpl::onError(const ErrorCallback &callback)
|
|
|
|
|
{
|
|
|
|
|
// store callback
|
|
|
|
|
_errorCallback = callback;
|
|
|
|
|
|
|
|
|
|
// if the channel is connected, all is ok
|
|
|
|
|
if (connected()) return;
|
|
|
|
|
|
|
|
|
|
// is the channel closing down?
|
|
|
|
|
if (_state == state_closing) return callback("Channel is closing down");
|
|
|
|
|
|
|
|
|
|
// the channel is closed, but what is the connection doing?
|
|
|
|
|
if (_connection == nullptr) return callback("Channel is not linked to a connection");
|
|
|
|
|
|
|
|
|
|
// if the connection is valid, this is a pure channel error
|
|
|
|
|
if (_connection->connected()) return callback("Channel is in an error state, but the connection is valid");
|
2017-03-07 17:10:11 +08:00
|
|
|
|
|
|
|
|
// the connection is closing down
|
|
|
|
|
if (_connection->closing()) return callback("Channel is in an error state, the AMQP connection is closing down");
|
|
|
|
|
|
|
|
|
|
// the connection is already closed
|
|
|
|
|
if (_connection->closed()) return callback("Channel is in an error state, the AMQP connection has been closed");
|
|
|
|
|
|
2017-03-03 19:56:54 +08:00
|
|
|
// direct call if channel is already in error state
|
2017-03-07 17:10:11 +08:00
|
|
|
callback("Channel is in error state, something went wrong with the AMQP connection");
|
2017-03-03 19:56:54 +08:00
|
|
|
}
|
|
|
|
|
|
2014-01-04 19:45:04 +08:00
|
|
|
/**
|
2014-08-20 18:44:52 +08:00
|
|
|
* Initialize the object with an connection
|
2014-01-04 19:45:04 +08:00
|
|
|
* @param connection
|
|
|
|
|
*/
|
2014-08-20 18:44:52 +08:00
|
|
|
void ChannelImpl::attach(Connection *connection)
|
2014-01-04 19:45:04 +08:00
|
|
|
{
|
2014-08-20 18:44:52 +08:00
|
|
|
// get connection impl
|
|
|
|
|
_connection = &connection->_implementation;
|
|
|
|
|
|
|
|
|
|
// retrieve an ID
|
2014-08-20 17:47:16 +08:00
|
|
|
_id = _connection->add(shared_from_this());
|
2014-08-20 18:44:52 +08:00
|
|
|
|
2014-01-04 19:45:04 +08:00
|
|
|
// check if the id is valid
|
|
|
|
|
if (_id == 0)
|
|
|
|
|
{
|
|
|
|
|
// this is invalid
|
|
|
|
|
_state = state_closed;
|
|
|
|
|
}
|
2014-08-20 19:40:29 +08:00
|
|
|
else
|
2014-01-04 19:45:04 +08:00
|
|
|
{
|
2014-08-20 19:40:29 +08:00
|
|
|
// assume channel is connected
|
2014-01-04 19:45:04 +08:00
|
|
|
_state = state_connected;
|
2014-08-20 19:40:29 +08:00
|
|
|
|
|
|
|
|
// send the open frame
|
|
|
|
|
if (send(ChannelOpenFrame(_id))) return;
|
|
|
|
|
|
|
|
|
|
// report an error
|
|
|
|
|
reportError("Channel could not be initialized", true);
|
2014-01-04 19:45:04 +08:00
|
|
|
}
|
2014-08-20 18:44:52 +08:00
|
|
|
}
|
2014-04-15 16:43:33 +08:00
|
|
|
|
|
|
|
|
/**
|
|
|
|
|
* Push a deferred result
|
|
|
|
|
* @param result The deferred object to push
|
|
|
|
|
*/
|
2014-09-02 16:32:55 +08:00
|
|
|
Deferred &ChannelImpl::push(const std::shared_ptr<Deferred> &deferred)
|
2014-04-15 16:43:33 +08:00
|
|
|
{
|
|
|
|
|
// do we already have an oldest?
|
2014-09-02 16:32:55 +08:00
|
|
|
if (!_oldestCallback) _oldestCallback = deferred;
|
2014-04-29 21:51:33 +08:00
|
|
|
|
2014-04-15 16:43:33 +08:00
|
|
|
// do we already have a newest?
|
|
|
|
|
if (_newestCallback) _newestCallback->add(deferred);
|
2014-04-29 21:51:33 +08:00
|
|
|
|
2014-04-15 16:43:33 +08:00
|
|
|
// store newest callback
|
|
|
|
|
_newestCallback = deferred;
|
2014-04-29 21:51:33 +08:00
|
|
|
|
2014-04-15 19:01:27 +08:00
|
|
|
// done
|
|
|
|
|
return *deferred;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
/**
|
|
|
|
|
* Send a frame and push a deferred result
|
|
|
|
|
* @param frame The frame to send
|
|
|
|
|
*/
|
|
|
|
|
Deferred &ChannelImpl::push(const Frame &frame)
|
|
|
|
|
{
|
|
|
|
|
// send the frame, and push the result
|
2014-09-02 16:32:55 +08:00
|
|
|
return push(std::make_shared<Deferred>(!send(frame)));
|
2014-01-04 19:45:04 +08:00
|
|
|
}
|
|
|
|
|
|
|
|
|
|
/**
|
|
|
|
|
* Pause deliveries on a channel
|
2014-04-08 20:42:07 +08:00
|
|
|
*
|
2014-01-04 19:45:04 +08:00
|
|
|
* This will stop all incoming messages
|
2014-04-08 20:42:07 +08:00
|
|
|
*
|
|
|
|
|
* This function returns a deferred handler. Callbacks can be installed
|
|
|
|
|
* using onSuccess(), onError() and onFinalize() methods.
|
2014-01-04 19:45:04 +08:00
|
|
|
*/
|
2014-04-15 16:43:33 +08:00
|
|
|
Deferred &ChannelImpl::pause()
|
2014-01-04 19:45:04 +08:00
|
|
|
{
|
2014-04-08 20:42:07 +08:00
|
|
|
// send a channel flow frame
|
2014-04-15 19:01:27 +08:00
|
|
|
return push(ChannelFlowFrame(_id, false));
|
2014-01-04 19:45:04 +08:00
|
|
|
}
|
|
|
|
|
|
|
|
|
|
/**
|
|
|
|
|
* Resume a paused channel
|
2014-04-08 20:42:07 +08:00
|
|
|
*
|
|
|
|
|
* This will resume incoming messages
|
|
|
|
|
*
|
|
|
|
|
* This function returns a deferred handler. Callbacks can be installed
|
|
|
|
|
* using onSuccess(), onError() and onFinalize() methods.
|
2014-01-04 19:45:04 +08:00
|
|
|
*/
|
2014-04-15 16:43:33 +08:00
|
|
|
Deferred &ChannelImpl::resume()
|
2014-01-04 19:45:04 +08:00
|
|
|
{
|
2014-04-08 20:42:07 +08:00
|
|
|
// send a channel flow frame
|
2014-04-15 19:01:27 +08:00
|
|
|
return push(ChannelFlowFrame(_id, true));
|
2014-01-04 19:45:04 +08:00
|
|
|
}
|
|
|
|
|
|
2017-06-09 04:19:55 +08:00
|
|
|
/**
|
|
|
|
|
* Put channel in a confirm mode
|
|
|
|
|
*
|
|
|
|
|
* This function returns a deferred handler. Callbacks can be installed
|
|
|
|
|
* using onSuccess(), onError() and onFinalize() methods.
|
|
|
|
|
*/
|
2018-05-14 18:59:20 +08:00
|
|
|
Deferred &ChannelImpl::confirmSelect()
|
2017-06-09 04:19:55 +08:00
|
|
|
{
|
|
|
|
|
// send a transaction frame
|
|
|
|
|
return push(ConfirmSelectFrame(_id));
|
|
|
|
|
}
|
|
|
|
|
|
2014-01-04 19:45:04 +08:00
|
|
|
/**
|
|
|
|
|
* Start a transaction
|
2014-04-08 20:42:07 +08:00
|
|
|
*
|
|
|
|
|
* This function returns a deferred handler. Callbacks can be installed
|
|
|
|
|
* using onSuccess(), onError() and onFinalize() methods.
|
2014-01-04 19:45:04 +08:00
|
|
|
*/
|
2014-04-15 16:43:33 +08:00
|
|
|
Deferred &ChannelImpl::startTransaction()
|
2014-01-04 19:45:04 +08:00
|
|
|
{
|
2014-04-08 20:42:07 +08:00
|
|
|
// send a transaction frame
|
2014-04-15 19:01:27 +08:00
|
|
|
return push(TransactionSelectFrame(_id));
|
2014-04-08 20:42:07 +08:00
|
|
|
}
|
2014-01-04 19:45:04 +08:00
|
|
|
|
|
|
|
|
/**
|
|
|
|
|
* Commit the current transaction
|
2014-04-08 20:42:07 +08:00
|
|
|
*
|
|
|
|
|
* This function returns a deferred handler. Callbacks can be installed
|
|
|
|
|
* using onSuccess(), onError() and onFinalize() methods.
|
2014-01-04 19:45:04 +08:00
|
|
|
*/
|
2014-04-15 16:43:33 +08:00
|
|
|
Deferred &ChannelImpl::commitTransaction()
|
2014-01-04 19:45:04 +08:00
|
|
|
{
|
2014-04-08 20:42:07 +08:00
|
|
|
// send a transaction frame
|
2014-04-15 19:01:27 +08:00
|
|
|
return push(TransactionCommitFrame(_id));
|
2014-01-04 19:45:04 +08:00
|
|
|
}
|
|
|
|
|
|
|
|
|
|
/**
|
|
|
|
|
* Rollback the current transaction
|
2014-04-08 20:42:07 +08:00
|
|
|
*
|
|
|
|
|
* This function returns a deferred handler. Callbacks can be installed
|
|
|
|
|
* using onSuccess(), onError() and onFinalize() methods.
|
2014-01-04 19:45:04 +08:00
|
|
|
*/
|
2014-04-15 16:43:33 +08:00
|
|
|
Deferred &ChannelImpl::rollbackTransaction()
|
2014-01-04 19:45:04 +08:00
|
|
|
{
|
2014-04-08 20:42:07 +08:00
|
|
|
// send a transaction frame
|
2014-04-15 19:01:27 +08:00
|
|
|
return push(TransactionRollbackFrame(_id));
|
2014-01-04 19:45:04 +08:00
|
|
|
}
|
|
|
|
|
|
|
|
|
|
/**
|
|
|
|
|
* Close the current channel
|
2014-04-08 20:42:07 +08:00
|
|
|
*
|
|
|
|
|
* This function returns a deferred handler. Callbacks can be installed
|
|
|
|
|
* using onSuccess(), onError() and onFinalize() methods.
|
2014-01-04 19:45:04 +08:00
|
|
|
*/
|
2014-04-15 16:43:33 +08:00
|
|
|
Deferred &ChannelImpl::close()
|
2014-01-04 19:45:04 +08:00
|
|
|
{
|
2014-08-20 17:47:16 +08:00
|
|
|
// this is completely pointless if not connected
|
2015-05-18 16:56:50 +08:00
|
|
|
if (!connected()) return push(std::make_shared<Deferred>(_state == state_closing));
|
2014-08-20 17:47:16 +08:00
|
|
|
|
2014-04-08 20:42:07 +08:00
|
|
|
// send a channel close frame
|
2014-04-15 19:01:27 +08:00
|
|
|
auto &handler = push(ChannelCloseFrame(_id));
|
2014-01-07 00:15:21 +08:00
|
|
|
|
2014-04-08 20:42:07 +08:00
|
|
|
// was the frame sent and are we still alive?
|
2014-04-15 19:01:27 +08:00
|
|
|
if (handler) _state = state_closing;
|
2014-01-04 19:45:04 +08:00
|
|
|
|
|
|
|
|
// done
|
2014-04-08 20:42:07 +08:00
|
|
|
return handler;
|
2014-01-04 19:45:04 +08:00
|
|
|
}
|
|
|
|
|
|
|
|
|
|
/**
|
|
|
|
|
* declare an exchange
|
2014-04-08 20:42:07 +08:00
|
|
|
|
2014-01-04 19:45:04 +08:00
|
|
|
* @param name name of the exchange to declare
|
|
|
|
|
* @param type type of exchange
|
|
|
|
|
* @param flags additional settings for the exchange
|
|
|
|
|
* @param arguments additional arguments
|
2014-04-08 20:42:07 +08:00
|
|
|
*
|
|
|
|
|
* This function returns a deferred handler. Callbacks can be installed
|
|
|
|
|
* using onSuccess(), onError() and onFinalize() methods.
|
2014-01-04 19:45:04 +08:00
|
|
|
*/
|
2014-04-15 16:43:33 +08:00
|
|
|
Deferred &ChannelImpl::declareExchange(const std::string &name, ExchangeType type, int flags, const Table &arguments)
|
2014-01-04 19:45:04 +08:00
|
|
|
{
|
2014-01-07 00:15:21 +08:00
|
|
|
// convert exchange type
|
2018-02-07 04:54:56 +08:00
|
|
|
const char *exchangeType = "";
|
2016-06-15 23:20:31 +08:00
|
|
|
|
|
|
|
|
// convert the exchange type into a string
|
|
|
|
|
if (type == ExchangeType::fanout) exchangeType = "fanout";
|
|
|
|
|
else if (type == ExchangeType::direct) exchangeType = "direct";
|
|
|
|
|
else if (type == ExchangeType::topic) exchangeType = "topic";
|
|
|
|
|
else if (type == ExchangeType::headers) exchangeType = "headers";
|
|
|
|
|
else if (type == ExchangeType::consistent_hash) exchangeType = "x-consistent-hash";
|
2014-01-07 00:15:21 +08:00
|
|
|
|
2018-02-07 04:54:56 +08:00
|
|
|
// the boolean options
|
|
|
|
|
bool passive = flags & AMQP::passive;
|
|
|
|
|
bool durable = flags & AMQP::durable;
|
|
|
|
|
bool autodelete = flags & AMQP::autodelete;
|
|
|
|
|
bool internal = flags & AMQP::internal;
|
|
|
|
|
bool nowait = flags & AMQP::nowait;
|
|
|
|
|
|
2014-01-04 19:45:04 +08:00
|
|
|
// send declare exchange frame
|
2018-02-07 04:54:56 +08:00
|
|
|
return push(ExchangeDeclareFrame(_id, name, exchangeType, passive, durable, autodelete, internal, nowait, arguments));
|
2014-01-04 19:45:04 +08:00
|
|
|
}
|
|
|
|
|
|
|
|
|
|
/**
|
|
|
|
|
* bind an exchange
|
2014-04-08 20:42:07 +08:00
|
|
|
*
|
2014-01-04 19:45:04 +08:00
|
|
|
* @param source exchange which binds to target
|
|
|
|
|
* @param target exchange to bind to
|
|
|
|
|
* @param routingKey routing key
|
|
|
|
|
* @param arguments additional arguments for binding
|
2014-04-08 20:42:07 +08:00
|
|
|
*
|
|
|
|
|
* This function returns a deferred handler. Callbacks can be installed
|
|
|
|
|
* using onSuccess(), onError() and onFinalize() methods.
|
2014-01-04 19:45:04 +08:00
|
|
|
*/
|
2014-04-29 21:51:33 +08:00
|
|
|
Deferred &ChannelImpl::bindExchange(const std::string &source, const std::string &target, const std::string &routingkey, const Table &arguments)
|
2014-01-04 19:45:04 +08:00
|
|
|
{
|
|
|
|
|
// send exchange bind frame
|
2014-04-29 21:51:33 +08:00
|
|
|
return push(ExchangeBindFrame(_id, target, source, routingkey, false, arguments));
|
2014-01-04 19:45:04 +08:00
|
|
|
}
|
|
|
|
|
|
|
|
|
|
/**
|
|
|
|
|
* unbind two exchanges
|
2014-04-08 20:42:07 +08:00
|
|
|
*
|
2014-01-04 19:45:04 +08:00
|
|
|
* @param source the source exchange
|
|
|
|
|
* @param target the target exchange
|
|
|
|
|
* @param routingkey the routing key
|
|
|
|
|
* @param arguments additional unbind arguments
|
2014-04-08 20:42:07 +08:00
|
|
|
*
|
|
|
|
|
* This function returns a deferred handler. Callbacks can be installed
|
|
|
|
|
* using onSuccess(), onError() and onFinalize() methods.
|
2014-01-04 19:45:04 +08:00
|
|
|
*/
|
2014-04-29 21:51:33 +08:00
|
|
|
Deferred &ChannelImpl::unbindExchange(const std::string &source, const std::string &target, const std::string &routingkey, const Table &arguments)
|
2014-01-04 19:45:04 +08:00
|
|
|
{
|
|
|
|
|
// send exchange unbind frame
|
2014-04-29 21:51:33 +08:00
|
|
|
return push(ExchangeUnbindFrame(_id, target, source, routingkey, false, arguments));
|
2014-01-04 19:45:04 +08:00
|
|
|
}
|
|
|
|
|
|
|
|
|
|
/**
|
|
|
|
|
* remove an exchange
|
2014-04-08 20:42:07 +08:00
|
|
|
*
|
2014-01-04 19:45:04 +08:00
|
|
|
* @param name name of the exchange to remove
|
|
|
|
|
* @param flags additional settings for deleting the exchange
|
2014-04-08 20:42:07 +08:00
|
|
|
*
|
|
|
|
|
* This function returns a deferred handler. Callbacks can be installed
|
|
|
|
|
* using onSuccess(), onError() and onFinalize() methods.
|
2014-01-04 19:45:04 +08:00
|
|
|
*/
|
2014-04-15 16:43:33 +08:00
|
|
|
Deferred &ChannelImpl::removeExchange(const std::string &name, int flags)
|
2014-01-04 19:45:04 +08:00
|
|
|
{
|
|
|
|
|
// send delete exchange frame
|
2015-04-24 16:46:44 +08:00
|
|
|
return push(ExchangeDeleteFrame(_id, name, (flags & ifunused) != 0, false));
|
2014-01-04 19:45:04 +08:00
|
|
|
}
|
|
|
|
|
|
|
|
|
|
/**
|
|
|
|
|
* declare a queue
|
|
|
|
|
* @param name queue name
|
|
|
|
|
* @param flags additional settings for the queue
|
|
|
|
|
* @param arguments additional arguments
|
2014-04-08 20:42:07 +08:00
|
|
|
*
|
|
|
|
|
* This function returns a deferred handler. Callbacks can be installed
|
|
|
|
|
* using onSuccess(), onError() and onFinalize() methods.
|
2014-01-04 19:45:04 +08:00
|
|
|
*/
|
2014-04-15 16:43:33 +08:00
|
|
|
DeferredQueue &ChannelImpl::declareQueue(const std::string &name, int flags, const Table &arguments)
|
2014-01-04 19:45:04 +08:00
|
|
|
{
|
2014-04-15 16:43:33 +08:00
|
|
|
// the frame to send
|
2015-04-24 16:46:44 +08:00
|
|
|
QueueDeclareFrame frame(_id, name, (flags & passive) != 0, (flags & durable) != 0, (flags & exclusive) != 0, (flags & autodelete) != 0, false, arguments);
|
2014-04-29 21:51:33 +08:00
|
|
|
|
2014-01-04 19:45:04 +08:00
|
|
|
// send the queuedeclareframe
|
2014-09-02 16:32:55 +08:00
|
|
|
auto result = std::make_shared<DeferredQueue>(!send(frame));
|
2014-04-29 21:51:33 +08:00
|
|
|
|
2014-04-15 16:43:33 +08:00
|
|
|
// add the deferred result
|
2014-04-15 19:01:27 +08:00
|
|
|
push(result);
|
2014-04-29 21:51:33 +08:00
|
|
|
|
2014-04-15 16:43:33 +08:00
|
|
|
// done
|
|
|
|
|
return *result;
|
2014-01-04 19:45:04 +08:00
|
|
|
}
|
|
|
|
|
|
|
|
|
|
/**
|
|
|
|
|
* Bind a queue to an exchange
|
2014-04-08 20:42:07 +08:00
|
|
|
*
|
2014-01-04 19:45:04 +08:00
|
|
|
* @param exchangeName name of the exchange to bind to
|
|
|
|
|
* @param queueName name of the queue
|
|
|
|
|
* @param routingkey routingkey
|
|
|
|
|
* @param arguments additional arguments
|
2014-04-08 20:42:07 +08:00
|
|
|
*
|
|
|
|
|
* This function returns a deferred handler. Callbacks can be installed
|
|
|
|
|
* using onSuccess(), onError() and onFinalize() methods.
|
2014-01-04 19:45:04 +08:00
|
|
|
*/
|
2014-04-29 21:51:33 +08:00
|
|
|
Deferred &ChannelImpl::bindQueue(const std::string &exchangeName, const std::string &queueName, const std::string &routingkey, const Table &arguments)
|
2014-01-04 19:45:04 +08:00
|
|
|
{
|
|
|
|
|
// send the bind queue frame
|
2014-04-29 21:51:33 +08:00
|
|
|
return push(QueueBindFrame(_id, queueName, exchangeName, routingkey, false, arguments));
|
2014-01-04 19:45:04 +08:00
|
|
|
}
|
|
|
|
|
|
|
|
|
|
/**
|
|
|
|
|
* Unbind a queue from an exchange
|
2014-04-08 20:42:07 +08:00
|
|
|
*
|
2014-01-04 19:45:04 +08:00
|
|
|
* @param exchange the source exchange
|
|
|
|
|
* @param queue the target queue
|
|
|
|
|
* @param routingkey the routing key
|
|
|
|
|
* @param arguments additional bind arguments
|
2014-04-08 20:42:07 +08:00
|
|
|
*
|
|
|
|
|
* This function returns a deferred handler. Callbacks can be installed
|
|
|
|
|
* using onSuccess(), onError() and onFinalize() methods.
|
2014-01-04 19:45:04 +08:00
|
|
|
*/
|
2014-04-15 16:43:33 +08:00
|
|
|
Deferred &ChannelImpl::unbindQueue(const std::string &exchange, const std::string &queue, const std::string &routingkey, const Table &arguments)
|
2014-01-04 19:45:04 +08:00
|
|
|
{
|
|
|
|
|
// send the unbind queue frame
|
2014-04-15 19:01:27 +08:00
|
|
|
return push(QueueUnbindFrame(_id, queue, exchange, routingkey, arguments));
|
2014-01-04 19:45:04 +08:00
|
|
|
}
|
|
|
|
|
|
|
|
|
|
/**
|
|
|
|
|
* Purge a queue
|
|
|
|
|
* @param queue queue to purge
|
2014-04-08 20:42:07 +08:00
|
|
|
*
|
|
|
|
|
* This function returns a deferred handler. Callbacks can be installed
|
|
|
|
|
* using onSuccess(), onError() and onFinalize() methods.
|
|
|
|
|
*
|
|
|
|
|
* The onSuccess() callback that you can install should have the following signature:
|
|
|
|
|
*
|
|
|
|
|
* void myCallback(AMQP::Channel *channel, uint32_t messageCount);
|
|
|
|
|
*
|
|
|
|
|
* For example: channel.declareQueue("myqueue").onSuccess([](AMQP::Channel *channel, uint32_t messageCount) {
|
|
|
|
|
*
|
|
|
|
|
* std::cout << "Queue purged, all " << messageCount << " messages removed" << std::endl;
|
|
|
|
|
*
|
|
|
|
|
* });
|
2014-01-04 19:45:04 +08:00
|
|
|
*/
|
2014-04-29 21:51:33 +08:00
|
|
|
DeferredDelete &ChannelImpl::purgeQueue(const std::string &name)
|
2014-01-04 19:45:04 +08:00
|
|
|
{
|
2014-04-29 21:51:33 +08:00
|
|
|
// the frame to send
|
|
|
|
|
QueuePurgeFrame frame(_id, name, false);
|
|
|
|
|
|
2014-04-15 16:43:33 +08:00
|
|
|
// send the frame, and create deferred object
|
2014-09-02 16:32:55 +08:00
|
|
|
auto deferred = std::make_shared<DeferredDelete>(!send(frame));
|
2014-04-29 21:51:33 +08:00
|
|
|
|
2014-04-15 16:43:33 +08:00
|
|
|
// push to list
|
2014-04-15 19:01:27 +08:00
|
|
|
push(deferred);
|
2014-04-29 21:51:33 +08:00
|
|
|
|
2014-04-15 16:43:33 +08:00
|
|
|
// done
|
|
|
|
|
return *deferred;
|
2014-01-04 19:45:04 +08:00
|
|
|
}
|
|
|
|
|
|
|
|
|
|
/**
|
|
|
|
|
* Remove a queue
|
|
|
|
|
* @param queue queue to remove
|
|
|
|
|
* @param flags additional flags
|
2014-04-08 20:42:07 +08:00
|
|
|
*
|
|
|
|
|
* This function returns a deferred handler. Callbacks can be installed
|
|
|
|
|
* using onSuccess(), onError() and onFinalize() methods.
|
|
|
|
|
*
|
|
|
|
|
* The onSuccess() callback that you can install should have the following signature:
|
|
|
|
|
*
|
|
|
|
|
* void myCallback(AMQP::Channel *channel, uint32_t messageCount);
|
|
|
|
|
*
|
|
|
|
|
* For example: channel.declareQueue("myqueue").onSuccess([](AMQP::Channel *channel, uint32_t messageCount) {
|
|
|
|
|
*
|
|
|
|
|
* std::cout << "Queue deleted, along with " << messageCount << " messages" << std::endl;
|
|
|
|
|
*
|
|
|
|
|
* });
|
2014-01-04 19:45:04 +08:00
|
|
|
*/
|
2014-04-15 16:43:33 +08:00
|
|
|
DeferredDelete &ChannelImpl::removeQueue(const std::string &name, int flags)
|
2014-01-04 19:45:04 +08:00
|
|
|
{
|
2014-04-15 16:43:33 +08:00
|
|
|
// the frame to send
|
2015-04-24 16:46:44 +08:00
|
|
|
QueueDeleteFrame frame(_id, name, (flags & ifunused) != 0, (flags & ifempty) != 0, false);
|
2014-04-15 16:43:33 +08:00
|
|
|
|
|
|
|
|
// send the frame, and create deferred object
|
2014-09-02 16:32:55 +08:00
|
|
|
auto deferred = std::make_shared<DeferredDelete>(!send(frame));
|
2014-04-29 21:51:33 +08:00
|
|
|
|
2014-04-15 16:43:33 +08:00
|
|
|
// push to list
|
2014-04-15 19:01:27 +08:00
|
|
|
push(deferred);
|
2014-04-29 21:51:33 +08:00
|
|
|
|
2014-04-15 16:43:33 +08:00
|
|
|
// done
|
|
|
|
|
return *deferred;
|
2014-01-04 19:45:04 +08:00
|
|
|
}
|
2014-01-05 20:08:35 +08:00
|
|
|
|
|
|
|
|
/**
|
|
|
|
|
* Publish a message to an exchange
|
2014-04-08 20:42:07 +08:00
|
|
|
*
|
2014-01-05 20:08:35 +08:00
|
|
|
* @param exchange the exchange to publish to
|
|
|
|
|
* @param routingkey the routing key
|
|
|
|
|
* @param envelope the full envelope to send
|
|
|
|
|
* @param message the message to send
|
|
|
|
|
* @param size size of the message
|
2018-03-02 05:27:27 +08:00
|
|
|
* @param flags
|
2018-03-02 04:12:53 +08:00
|
|
|
* @return DeferredPublisher
|
2014-01-05 20:08:35 +08:00
|
|
|
*/
|
2018-03-02 05:27:27 +08:00
|
|
|
DeferredPublisher &ChannelImpl::publish(const std::string &exchange, const std::string &routingKey, const Envelope &envelope, int flags)
|
2014-01-05 20:08:35 +08:00
|
|
|
{
|
2014-01-07 00:15:21 +08:00
|
|
|
// we are going to send out multiple frames, each one will trigger a call to the handler,
|
|
|
|
|
// which in turn could destruct the channel object, we need to monitor that
|
|
|
|
|
Monitor monitor(this);
|
2014-04-08 20:42:07 +08:00
|
|
|
|
2014-01-06 01:50:41 +08:00
|
|
|
// @todo do not copy the entire buffer to individual frames
|
2018-03-02 04:12:53 +08:00
|
|
|
|
|
|
|
|
// make sure we have a deferred object to return
|
|
|
|
|
if (!_publisher) _publisher.reset(new DeferredPublisher(this));
|
2014-04-08 20:42:07 +08:00
|
|
|
|
2014-01-05 20:08:35 +08:00
|
|
|
// send the publish frame
|
2018-03-02 05:27:27 +08:00
|
|
|
if (!send(BasicPublishFrame(_id, exchange, routingKey, (flags & mandatory) != 0, (flags & immediate) != 0))) return *_publisher;
|
2014-04-08 20:42:07 +08:00
|
|
|
|
2014-01-07 00:15:21 +08:00
|
|
|
// channel still valid?
|
2018-03-02 04:12:53 +08:00
|
|
|
if (!monitor.valid()) return *_publisher;
|
2014-01-05 20:08:35 +08:00
|
|
|
|
|
|
|
|
// send header
|
2018-03-02 04:12:53 +08:00
|
|
|
if (!send(BasicHeaderFrame(_id, envelope))) return *_publisher;
|
2014-04-08 20:42:07 +08:00
|
|
|
|
2014-01-07 00:15:21 +08:00
|
|
|
// channel and connection still valid?
|
2018-03-02 04:12:53 +08:00
|
|
|
if (!monitor.valid() || !_connection) return *_publisher;
|
2014-04-08 20:42:07 +08:00
|
|
|
|
2014-01-05 20:08:35 +08:00
|
|
|
// the max payload size is the max frame size minus the bytes for headers and trailer
|
2014-01-07 00:15:21 +08:00
|
|
|
uint32_t maxpayload = _connection->maxPayload();
|
2015-04-24 16:46:44 +08:00
|
|
|
uint64_t bytessent = 0;
|
2014-04-08 20:42:07 +08:00
|
|
|
|
2014-01-05 20:08:35 +08:00
|
|
|
// the buffer
|
|
|
|
|
const char *data = envelope.body();
|
2015-04-24 16:46:44 +08:00
|
|
|
uint64_t bytesleft = envelope.bodySize();
|
2014-04-08 20:42:07 +08:00
|
|
|
|
2014-01-05 20:08:35 +08:00
|
|
|
// split up the body in multiple frames depending on the max frame size
|
|
|
|
|
while (bytesleft > 0)
|
|
|
|
|
{
|
|
|
|
|
// size of this chunk
|
2015-04-24 16:46:44 +08:00
|
|
|
uint64_t chunksize = std::min(static_cast<uint64_t>(maxpayload), bytesleft);
|
2014-04-08 20:42:07 +08:00
|
|
|
|
2014-01-05 20:08:35 +08:00
|
|
|
// send out a body frame
|
2018-03-02 04:12:53 +08:00
|
|
|
if (!send(BodyFrame(_id, data + bytessent, (uint32_t)chunksize))) return *_publisher;
|
2014-04-08 20:42:07 +08:00
|
|
|
|
2014-01-07 00:15:21 +08:00
|
|
|
// channel still valid?
|
2018-03-02 04:12:53 +08:00
|
|
|
if (!monitor.valid()) return *_publisher;
|
2014-04-08 20:42:07 +08:00
|
|
|
|
2014-01-05 20:08:35 +08:00
|
|
|
// update counters
|
|
|
|
|
bytessent += chunksize;
|
|
|
|
|
bytesleft -= chunksize;
|
|
|
|
|
}
|
2014-04-08 20:42:07 +08:00
|
|
|
|
2014-01-05 20:08:35 +08:00
|
|
|
// done
|
2018-03-02 04:12:53 +08:00
|
|
|
return *_publisher;
|
2014-01-05 20:08:35 +08:00
|
|
|
}
|
2014-01-04 19:45:04 +08:00
|
|
|
|
2014-01-05 21:19:35 +08:00
|
|
|
/**
|
|
|
|
|
* Set the Quality of Service (QOS) for this channel
|
|
|
|
|
* @param prefetchCount maximum number of messages to prefetch
|
2014-04-08 20:42:07 +08:00
|
|
|
*
|
|
|
|
|
* This function returns a deferred handler. Callbacks can be installed
|
|
|
|
|
* using onSuccess(), onError() and onFinalize() methods.
|
2014-07-31 16:10:15 +08:00
|
|
|
*
|
|
|
|
|
* @param prefetchCount number of messages to fetch
|
|
|
|
|
* @param global share counter between all consumers on the same channel
|
2014-01-05 21:19:35 +08:00
|
|
|
*/
|
2014-07-31 16:10:15 +08:00
|
|
|
Deferred &ChannelImpl::setQos(uint16_t prefetchCount, bool global)
|
2014-01-05 21:19:35 +08:00
|
|
|
{
|
2014-01-06 01:50:41 +08:00
|
|
|
// send a qos frame
|
2014-07-31 16:10:15 +08:00
|
|
|
return push(BasicQosFrame(_id, prefetchCount, global));
|
2014-01-05 21:19:35 +08:00
|
|
|
}
|
|
|
|
|
|
2014-01-06 01:50:41 +08:00
|
|
|
/**
|
|
|
|
|
* Tell the RabbitMQ server that we're ready to consume messages
|
|
|
|
|
* @param queue the queue from which you want to consume
|
|
|
|
|
* @param tag a consumer tag that will be associated with this consume operation
|
|
|
|
|
* @param flags additional flags
|
|
|
|
|
* @param arguments additional arguments
|
2014-04-10 18:51:04 +08:00
|
|
|
*
|
|
|
|
|
* This function returns a deferred handler. Callbacks can be installed
|
|
|
|
|
* using onSuccess(), onError() and onFinalize() methods.
|
|
|
|
|
*
|
|
|
|
|
* The onSuccess() callback that you can install should have the following signature:
|
|
|
|
|
*
|
|
|
|
|
* void myCallback(AMQP::Channel *channel, const std::string& tag);
|
|
|
|
|
*
|
|
|
|
|
* For example: channel.declareQueue("myqueue").onSuccess([](AMQP::Channel *channel, const std::string& tag) {
|
|
|
|
|
*
|
|
|
|
|
* std::cout << "Started consuming under tag " << tag << std::endl;
|
|
|
|
|
*
|
|
|
|
|
* });
|
2014-01-06 01:50:41 +08:00
|
|
|
*/
|
2014-04-10 18:51:04 +08:00
|
|
|
DeferredConsumer& ChannelImpl::consume(const std::string &queue, const std::string &tag, int flags, const Table &arguments)
|
2014-01-06 01:50:41 +08:00
|
|
|
{
|
2014-04-15 17:39:52 +08:00
|
|
|
// the frame to send
|
2015-04-24 16:46:44 +08:00
|
|
|
BasicConsumeFrame frame(_id, queue, tag, (flags & nolocal) != 0, (flags & noack) != 0, (flags & exclusive) != 0, false, arguments);
|
2014-04-29 21:51:33 +08:00
|
|
|
|
2014-04-15 17:39:52 +08:00
|
|
|
// send the frame, and create deferred object
|
2014-09-02 16:32:55 +08:00
|
|
|
auto deferred = std::make_shared<DeferredConsumer>(this, !send(frame));
|
2014-04-29 21:51:33 +08:00
|
|
|
|
2014-04-15 17:39:52 +08:00
|
|
|
// push to list
|
2014-04-15 19:01:27 +08:00
|
|
|
push(deferred);
|
2014-04-29 21:51:33 +08:00
|
|
|
|
2014-04-15 17:39:52 +08:00
|
|
|
// done
|
|
|
|
|
return *deferred;
|
2014-01-06 01:50:41 +08:00
|
|
|
}
|
|
|
|
|
|
|
|
|
|
/**
|
|
|
|
|
* Cancel a running consumer
|
|
|
|
|
* @param tag the consumer tag
|
2014-04-10 18:51:04 +08:00
|
|
|
*
|
|
|
|
|
* This function returns a deferred handler. Callbacks can be installed
|
|
|
|
|
* using onSuccess(), onError() and onFinalize() methods.
|
|
|
|
|
*
|
|
|
|
|
* The onSuccess() callback that you can install should have the following signature:
|
|
|
|
|
*
|
2014-07-31 18:58:13 +08:00
|
|
|
* void myCallback(const std::string& tag);
|
2014-04-10 18:51:04 +08:00
|
|
|
*
|
2014-07-31 18:58:13 +08:00
|
|
|
* For example: channel.declareQueue("myqueue").onSuccess([](const std::string& tag) {
|
2014-04-10 18:51:04 +08:00
|
|
|
*
|
|
|
|
|
* std::cout << "Started consuming under tag " << tag << std::endl;
|
|
|
|
|
*
|
|
|
|
|
* });
|
2014-01-06 01:50:41 +08:00
|
|
|
*/
|
2014-04-29 21:51:33 +08:00
|
|
|
DeferredCancel &ChannelImpl::cancel(const std::string &tag)
|
2014-01-06 01:50:41 +08:00
|
|
|
{
|
2014-04-15 16:43:33 +08:00
|
|
|
// the cancel frame to send
|
2014-04-29 21:51:33 +08:00
|
|
|
BasicCancelFrame frame(_id, tag, false);
|
2014-04-15 16:43:33 +08:00
|
|
|
|
|
|
|
|
// send the frame, and create deferred object
|
2014-09-02 16:32:55 +08:00
|
|
|
auto deferred = std::make_shared<DeferredCancel>(this, !send(frame));
|
2014-04-29 21:51:33 +08:00
|
|
|
|
2014-04-15 16:43:33 +08:00
|
|
|
// push to list
|
2014-04-15 19:01:27 +08:00
|
|
|
push(deferred);
|
2014-04-29 21:51:33 +08:00
|
|
|
|
2014-04-15 16:43:33 +08:00
|
|
|
// done
|
|
|
|
|
return *deferred;
|
2014-01-06 01:50:41 +08:00
|
|
|
}
|
|
|
|
|
|
2014-07-31 18:58:13 +08:00
|
|
|
/**
|
|
|
|
|
* Retrieve a single message from RabbitMQ
|
|
|
|
|
*
|
|
|
|
|
* When you call this method, you can get one single message from the queue (or none
|
|
|
|
|
* at all if the queue is empty). The deferred object that is returned, should be used
|
|
|
|
|
* to install a onEmpty() and onSuccess() callback function that will be called
|
|
|
|
|
* when the message is consumed and/or when the message could not be consumed.
|
|
|
|
|
*
|
|
|
|
|
* The following flags are supported:
|
|
|
|
|
*
|
|
|
|
|
* - noack if set, consumed messages do not have to be acked, this happens automatically
|
|
|
|
|
*
|
|
|
|
|
* @param queue name of the queue to consume from
|
|
|
|
|
* @param flags optional flags
|
|
|
|
|
*
|
|
|
|
|
* The object returns a deferred handler. Callbacks can be installed
|
|
|
|
|
* using onSuccess(), onEmpty(), onError() and onFinalize() methods.
|
|
|
|
|
*
|
|
|
|
|
* The onSuccess() callback has the following signature:
|
|
|
|
|
*
|
|
|
|
|
* void myCallback(const Message &message, uint64_t deliveryTag, bool redelivered);
|
|
|
|
|
*
|
|
|
|
|
* For example: channel.get("myqueue").onSuccess([](const Message &message, uint64_t deliveryTag, bool redelivered) {
|
|
|
|
|
*
|
|
|
|
|
* std::cout << "Message fetched" << std::endl;
|
|
|
|
|
*
|
|
|
|
|
* }).onEmpty([]() {
|
|
|
|
|
*
|
|
|
|
|
* std::cout << "Queue is empty" << std::endl;
|
|
|
|
|
*
|
|
|
|
|
* });
|
|
|
|
|
*/
|
|
|
|
|
DeferredGet &ChannelImpl::get(const std::string &queue, int flags)
|
|
|
|
|
{
|
|
|
|
|
// the get frame to send
|
2015-04-24 16:46:44 +08:00
|
|
|
BasicGetFrame frame(_id, queue, (flags & noack) != 0);
|
2014-07-31 18:58:13 +08:00
|
|
|
|
|
|
|
|
// send the frame, and create deferred object
|
2014-09-02 16:32:55 +08:00
|
|
|
auto deferred = std::make_shared<DeferredGet>(this, !send(frame));
|
2014-07-31 18:58:13 +08:00
|
|
|
|
|
|
|
|
// push to list
|
|
|
|
|
push(deferred);
|
|
|
|
|
|
|
|
|
|
// done
|
|
|
|
|
return *deferred;
|
|
|
|
|
}
|
|
|
|
|
|
2014-01-06 04:21:09 +08:00
|
|
|
/**
|
2014-04-10 18:51:04 +08:00
|
|
|
* Acknowledge a message
|
2014-01-06 04:21:09 +08:00
|
|
|
* @param deliveryTag the delivery tag
|
|
|
|
|
* @param flags optional flags
|
|
|
|
|
* @return bool
|
|
|
|
|
*/
|
|
|
|
|
bool ChannelImpl::ack(uint64_t deliveryTag, int flags)
|
|
|
|
|
{
|
|
|
|
|
// send an ack frame
|
2015-04-24 16:46:44 +08:00
|
|
|
return send(BasicAckFrame(_id, deliveryTag, (flags & multiple) != 0));
|
2014-01-06 04:21:09 +08:00
|
|
|
}
|
|
|
|
|
|
2014-01-06 21:28:58 +08:00
|
|
|
/**
|
|
|
|
|
* Reject a message
|
|
|
|
|
* @param deliveryTag the delivery tag
|
|
|
|
|
* @param flags optional flags
|
|
|
|
|
* @return bool
|
|
|
|
|
*/
|
|
|
|
|
bool ChannelImpl::reject(uint64_t deliveryTag, int flags)
|
|
|
|
|
{
|
2014-05-27 00:17:49 +08:00
|
|
|
// should we reject multiple messages?
|
|
|
|
|
if (flags & multiple)
|
|
|
|
|
{
|
|
|
|
|
// send a nack frame
|
2015-04-24 16:46:44 +08:00
|
|
|
return send(BasicNackFrame(_id, deliveryTag, true, (flags & requeue) != 0));
|
2014-05-27 00:17:49 +08:00
|
|
|
}
|
|
|
|
|
else
|
|
|
|
|
{
|
|
|
|
|
// send a reject frame
|
2015-04-24 16:46:44 +08:00
|
|
|
return send(BasicRejectFrame(_id, deliveryTag, (flags & requeue) != 0));
|
2014-05-27 00:17:49 +08:00
|
|
|
}
|
2014-01-06 21:28:58 +08:00
|
|
|
}
|
|
|
|
|
|
2014-01-06 21:38:48 +08:00
|
|
|
/**
|
|
|
|
|
* Recover un-acked messages
|
|
|
|
|
* @param flags optional flags
|
2014-04-08 20:42:07 +08:00
|
|
|
*
|
|
|
|
|
* This function returns a deferred handler. Callbacks can be installed
|
|
|
|
|
* using onSuccess(), onError() and onFinalize() methods.
|
2014-01-06 21:38:48 +08:00
|
|
|
*/
|
2014-04-15 16:43:33 +08:00
|
|
|
Deferred &ChannelImpl::recover(int flags)
|
2014-01-06 21:38:48 +08:00
|
|
|
{
|
|
|
|
|
// send a nack frame
|
2015-04-24 16:46:44 +08:00
|
|
|
return push(BasicRecoverFrame(_id, (flags & requeue) != 0));
|
2014-01-06 21:38:48 +08:00
|
|
|
}
|
|
|
|
|
|
2014-01-04 19:45:04 +08:00
|
|
|
/**
|
|
|
|
|
* Send a frame over the channel
|
|
|
|
|
* @param frame frame to send
|
2014-01-07 00:15:21 +08:00
|
|
|
* @return bool was the frame sent?
|
2014-01-04 19:45:04 +08:00
|
|
|
*/
|
2014-01-07 00:15:21 +08:00
|
|
|
bool ChannelImpl::send(const Frame &frame)
|
2014-01-04 19:45:04 +08:00
|
|
|
{
|
2014-01-07 00:15:21 +08:00
|
|
|
// skip if channel is not connected
|
2014-08-20 17:47:16 +08:00
|
|
|
if (_state == state_closed || !_connection) return false;
|
|
|
|
|
|
|
|
|
|
// if we're busy closing, we pretend that the send operation was a
|
|
|
|
|
// success. this causes the deferred object to be created, and to be
|
|
|
|
|
// added to the list of deferred objects. it will be notified about
|
|
|
|
|
// the error when the close operation succeeds
|
|
|
|
|
if (_state == state_closing) return true;
|
2018-02-07 17:08:32 +08:00
|
|
|
|
2014-04-29 21:51:33 +08:00
|
|
|
// are we currently in synchronous mode or are there
|
|
|
|
|
// other frames waiting for their turn to be sent?
|
|
|
|
|
if (_synchronous || !_queue.empty())
|
|
|
|
|
{
|
|
|
|
|
// we need to wait until the synchronous frame has
|
|
|
|
|
// been processed, so queue the frame until it was
|
2017-03-09 07:18:53 +08:00
|
|
|
_queue.emplace(frame.synchronous(), frame);
|
2014-04-29 21:51:33 +08:00
|
|
|
|
|
|
|
|
// it was of course not actually sent but we pretend
|
|
|
|
|
// that it was, because no error occured
|
|
|
|
|
return true;
|
|
|
|
|
}
|
|
|
|
|
|
2014-01-07 05:34:54 +08:00
|
|
|
// send to tcp connection
|
2014-08-20 19:40:29 +08:00
|
|
|
if (!_connection->send(frame)) return false;
|
|
|
|
|
|
|
|
|
|
// frame was sent, if this was a synchronous frame, we now have to wait
|
|
|
|
|
_synchronous = frame.synchronous();
|
|
|
|
|
|
|
|
|
|
// done
|
|
|
|
|
return true;
|
2014-01-04 19:45:04 +08:00
|
|
|
}
|
|
|
|
|
|
2014-04-29 21:51:33 +08:00
|
|
|
/**
|
2014-08-20 17:47:16 +08:00
|
|
|
* Signal the channel that a synchronous operation was completed. After
|
|
|
|
|
* this operation, waiting frames can be sent out.
|
2014-04-29 21:51:33 +08:00
|
|
|
*/
|
2014-08-20 17:47:16 +08:00
|
|
|
void ChannelImpl::onSynchronized()
|
2014-04-29 21:51:33 +08:00
|
|
|
{
|
|
|
|
|
// we are no longer waiting for synchronous operations
|
|
|
|
|
_synchronous = false;
|
|
|
|
|
|
|
|
|
|
// we need to monitor the channel for validity
|
|
|
|
|
Monitor monitor(this);
|
|
|
|
|
|
|
|
|
|
// send all frames while not in synchronous mode
|
2017-03-09 07:18:53 +08:00
|
|
|
while (_connection && !_synchronous && !_queue.empty())
|
2014-04-29 21:51:33 +08:00
|
|
|
{
|
|
|
|
|
// retrieve the first buffer and synchronous
|
2017-03-09 16:25:26 +08:00
|
|
|
auto &pair = _queue.front();
|
2014-04-29 21:51:33 +08:00
|
|
|
|
|
|
|
|
// mark as synchronous if necessary
|
|
|
|
|
_synchronous = pair.first;
|
|
|
|
|
|
|
|
|
|
// send it over the connection
|
2017-03-09 16:25:26 +08:00
|
|
|
_connection->send(std::move(pair.second));
|
2017-03-09 07:18:53 +08:00
|
|
|
|
2017-03-09 16:25:26 +08:00
|
|
|
// the user space handler may have destructed this channel object
|
2017-03-09 07:18:53 +08:00
|
|
|
if (!monitor.valid()) return;
|
|
|
|
|
|
|
|
|
|
// remove from the list
|
|
|
|
|
_queue.pop();
|
2014-04-29 21:51:33 +08:00
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
2014-08-20 17:47:16 +08:00
|
|
|
/**
|
|
|
|
|
* Report an error message on a channel
|
|
|
|
|
* @param message the error message
|
|
|
|
|
* @param notifyhandler should the channel-wide handler also be called?
|
|
|
|
|
*/
|
|
|
|
|
void ChannelImpl::reportError(const char *message, bool notifyhandler)
|
|
|
|
|
{
|
|
|
|
|
// change state
|
|
|
|
|
_state = state_closed;
|
|
|
|
|
_synchronous = false;
|
|
|
|
|
|
|
|
|
|
// the queue of messages that still have to sent can be emptied now
|
|
|
|
|
// (we do this by moving the current queue into an unused variable)
|
|
|
|
|
auto queue(std::move(_queue));
|
|
|
|
|
|
|
|
|
|
// we are going to call callbacks that could destruct the channel
|
|
|
|
|
Monitor monitor(this);
|
|
|
|
|
|
|
|
|
|
// call the oldest
|
|
|
|
|
if (_oldestCallback)
|
|
|
|
|
{
|
|
|
|
|
// copy the callback (so that it can not be destructed during
|
|
|
|
|
// the "reportError" call
|
|
|
|
|
auto cb = _oldestCallback;
|
|
|
|
|
|
|
|
|
|
// call the callback
|
2014-09-02 16:32:55 +08:00
|
|
|
auto next = cb->reportError(message);
|
2014-08-20 17:47:16 +08:00
|
|
|
|
|
|
|
|
// leap out if channel no longer exists
|
|
|
|
|
if (!monitor.valid()) return;
|
|
|
|
|
|
|
|
|
|
// set the oldest callback
|
2014-09-02 16:32:55 +08:00
|
|
|
_oldestCallback = next;
|
2014-08-20 17:47:16 +08:00
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// clean up all deferred other objects
|
|
|
|
|
while (_oldestCallback)
|
|
|
|
|
{
|
|
|
|
|
// copy the callback (so that it can not be destructed during
|
|
|
|
|
// the "reportError" call
|
|
|
|
|
auto cb = _oldestCallback;
|
|
|
|
|
|
|
|
|
|
// call the callback
|
2014-09-02 16:32:55 +08:00
|
|
|
auto next = cb->reportError("Channel is in error state");
|
2014-08-20 17:47:16 +08:00
|
|
|
|
|
|
|
|
// leap out if channel no longer exists
|
|
|
|
|
if (!monitor.valid()) return;
|
|
|
|
|
|
|
|
|
|
// set the oldest callback
|
2014-09-02 16:32:55 +08:00
|
|
|
_oldestCallback = next;
|
2014-08-20 17:47:16 +08:00
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// all callbacks have been processed, so we also can reset the pointer to the newest
|
|
|
|
|
_newestCallback = nullptr;
|
|
|
|
|
|
|
|
|
|
// inform handler
|
|
|
|
|
if (notifyhandler && _errorCallback) _errorCallback(message);
|
|
|
|
|
|
|
|
|
|
// leap out if object no longer exists
|
|
|
|
|
if (!monitor.valid()) return;
|
|
|
|
|
|
2016-06-23 20:42:50 +08:00
|
|
|
// the connection no longer has to know that this channel exists,
|
2014-08-20 17:47:16 +08:00
|
|
|
// because the channel ID is no longer in use
|
|
|
|
|
if (_connection) _connection->remove(this);
|
|
|
|
|
}
|
|
|
|
|
|
2014-01-06 01:50:41 +08:00
|
|
|
/**
|
2018-03-02 00:34:27 +08:00
|
|
|
* Get the current receiver for a given consumer tag
|
|
|
|
|
* @param consumertag the consumer frame
|
|
|
|
|
* @return DeferredConsumer
|
2014-01-06 01:50:41 +08:00
|
|
|
*/
|
2018-03-02 00:34:27 +08:00
|
|
|
DeferredConsumer *ChannelImpl::consumer(const std::string &consumertag) const
|
2014-01-06 01:50:41 +08:00
|
|
|
{
|
2018-03-02 00:34:27 +08:00
|
|
|
// look in the map
|
|
|
|
|
auto iter = _consumers.find(consumertag);
|
|
|
|
|
|
|
|
|
|
// return the result
|
|
|
|
|
return iter == _consumers.end() ? nullptr : iter->second.get();
|
2014-07-31 18:58:13 +08:00
|
|
|
}
|
|
|
|
|
|
2014-01-04 19:45:04 +08:00
|
|
|
/**
|
|
|
|
|
* End of namespace
|
|
|
|
|
*/
|
|
|
|
|
}
|
|
|
|
|
|