Implemented deferred consumers and a setTimeout method on the connection handler for indicating immediate failures on deferred objects

This commit is contained in:
Martijn Otto 2014-04-14 14:10:57 +02:00
parent 3348e2881c
commit 1c0495378a
8 changed files with 151 additions and 80 deletions

View File

@ -6,6 +6,8 @@
* @documentation public
*/
#pragma once
// base C++ include files
#include <vector>
#include <string>

View File

@ -22,24 +22,6 @@ private:
*/
ConnectionImpl _implementation;
/**
* Function to execute code after a certain timeout.
*
* If the timeout is 0, the code is supposed to be run
* in the next iteration of the event loop.
*
* This is a simple placeholder function that will just
* execute the code immediately, it should be overridden
* by the timeout function the used event loop has.
*
* @param timeout the amount of time to wait
* @param callback the callback to execute after the timeout
*/
std::function<void(double timeout, const std::function<void()>)> _timeoutHandler = [](double timeout, const std::function<void()>& callback) {
// execute callback immediately
callback();
};
public:
/**
* Construct an AMQP object based on full login data

View File

@ -4,7 +4,7 @@
*
* Interface that should be implemented by the caller of the library and
* that is passed to the AMQP connection. This interface contains all sorts
* of methods that are called when data needs to be sent, or when the
* of methods that are called when data needs to be sent, or when the
* AMQP connection ends up in a broken state.
*
* @copyright 2014 Copernica BV
@ -21,10 +21,22 @@ namespace AMQP {
class ConnectionHandler
{
public:
/**
* Set a function to be executed after a given timeout.
*
* This function is not strictly necessary to implement. If you
* do not implement it, certain channel methods that fail
* immediately will not be reported.
*
* @param timeout number of seconds to wait
* @param callback function to execute once time runs out
*/
virtual void setTimeout(double seconds, const std::function<void()>& callback) {}
/**
* Method that is called when data needs to be sent over the network
*
* Note that the AMQP library does no buffering by itself. This means
* Note that the AMQP library does no buffering by itself. This means
* that this method should always send out all data or do the buffering
* itself.
*
@ -33,28 +45,28 @@ public:
* @param size Size of the buffer
*/
virtual void onData(Connection *connection, const char *buffer, size_t size) = 0;
/**
* When the connection ends up in an error state this method is called.
* This happens when data comes in that does not match the AMQP protocol
*
*
* After this method is called, the connection no longer is in a valid
* state and can no longer be used.
*
*
* This method has an empty default implementation, although you are very
* much advised to implement it. Because when an error occurs, the connection
* is no longer usable, so you probably want to know.
*
*
* @param connection The connection that entered the error state
* @param message Error message
*/
virtual void onError(Connection *connection, const std::string &message) {}
/**
* Method that is called when the login attempt succeeded. After this method
* was called, the connection is ready to use. This is the first method
* that is normally called after you've constructed the connection object.
*
*
* According to the AMQP protocol, you must wait for the connection to become
* ready (and this onConnected method to be called) before you can start
* using the Connection object. However, this AMQP library will cache all
@ -64,17 +76,17 @@ public:
* @param connection The connection that can now be used
*/
virtual void onConnected(Connection *connection) {}
/**
* Method that is called when the connection was closed.
*
*
* This is the counter part of a call to Connection::close() and it confirms
* that the connection was correctly closed.
*
*
* @param connection The connection that was closed and that is now unusable
*/
virtual void onClosed(Connection *connection) {}
};
/**

View File

@ -298,7 +298,7 @@ public:
* The actual connection is a friend and can construct this class
*/
friend class Connection;
friend class ChannelImpl;
};
/**

View File

@ -56,11 +56,8 @@ private:
*
* @param error description of the error that occured
*/
void error(const std::string& error)
void error(const std::string& error) const
{
// we are now in a failed state
_failed = true;
// execute callbacks if registered
if (_errorCallback) _errorCallback(_channel, error);
if (_finalizeCallback) _finalizeCallback(_channel, error);

View File

@ -0,0 +1,76 @@
/**
* DeferredConsumer.h
*
* Deferred callback for consumers
*
* @copyright 2014 Copernica BV
*/
/**
* Set up namespace
*/
namespace AMQP {
/**
* We extend from the default deferred and add extra functionality
*/
class DeferredConsumer : public Deferred<const std::string&>
{
private:
/**
* Callback to execute when a message arrives
*/
std::function<void(Channel *channel, const Message &message, uint64_t deliveryTag, const std::string &consumerTag, bool redelivered)> _messageCallback;
/**
* Process a message
*
* @param message the message to process
* @param deliveryTag the message delivery tag
* @param consumerTag the tag we are consuming under
* @param is this a redelivered message?
*/
void message(const Message &message, uint64_t deliveryTag, const std::string &consumerTag, bool redelivered) const
{
// do we have a valid callback
if (_messageCallback) _messageCallback(_channel, message, deliveryTag, consumerTag, redelivered);
}
/**
* The channel implementation may call our
* private members and construct us
*/
friend class ChannelImpl;
friend class ConsumedMessage;
protected:
/**
* Protected constructor that can only be called
* from within the channel implementation
*
* @param channel the channel we operate under
* @param boolea are we already failed?
*/
DeferredConsumer(Channel *channel, bool failed = false) :
Deferred(channel, failed)
{}
public:
/**
* Register a function to be called when a message arrives
*
* Only one callback can be registered. Successive calls
* to this function will clear callbacks registered before.
*
* @param callback the callback to execute
*/
DeferredConsumer& onReceived(const std::function<void(Channel *channel, const Message &message, uint64_t deliveryTag, const std::string &consumerTag, bool redelivered)>& callback)
{
// store callback
_messageCallback = callback;
return *this;
}
};
/**
* End namespace
*/
}

View File

@ -540,25 +540,27 @@ template <typename... Arguments>
Deferred<Arguments...>& ChannelImpl::send(const Frame &frame, const char *message)
{
// create a new deferred handler and get a pointer to it
auto &handler = _callbacks.push_back(Deferred<Arguments...>(_parent));
// note: cannot use auto here or the lambda below chokes
// when compiling under gcc 4.8
Deferred<Arguments...> *handler = &_callbacks.push_back(Deferred<Arguments...>(_parent));
// send the frame over the channel
if (!send(frame))
{
// we can immediately put the handler in failed state
handler._failed = true;
handler->_failed = true;
// the frame could not be send
// we should register an error
// on the handler, but only after
// a timeout, so a handler can
// be attached first
// TODO
// register an error on the deferred handler
// after a timeout, so it gets called only
// after a possible handler was installed.
_connection->_handler->setTimeout(0, [handler, message]() {
// emit an error on the handler
handler->error(message);
});
}
// return the new handler
return handler;
return *handler;
}
/**

View File

@ -17,13 +17,13 @@ namespace AMQP {
/**
* Construct an AMQP object based on full login data
*
*
* The first parameter is a handler object. This handler class is
* an interface that should be implemented by the caller.
*
*
* Note that the constructor is private to ensure that nobody can construct
* this class, only the real Connection class via a friend construct
*
*
* @param parent Parent connection object
* @param handler Connection handler
* @param login Login data
@ -42,7 +42,7 @@ ConnectionImpl::~ConnectionImpl()
{
// close the connection in a nice fashion
close();
// invalidate all channels, so they will no longer call methods on this channel object
for (auto iter = _channels.begin(); iter != _channels.end(); iter++) iter->second->invalidate();
}
@ -57,20 +57,20 @@ uint16_t ConnectionImpl::add(ChannelImpl *channel)
{
// check if we have exceeded the limit already
if (_maxChannels > 0 && _channels.size() >= _maxChannels) return 0;
// keep looping to find an id that is not in use
while (true)
{
// is this id in use?
if (_nextFreeChannel > 0 && _channels.find(_nextFreeChannel) == _channels.end()) break;
// id is in use, move on
_nextFreeChannel++;
}
// we have a new channel
_channels[_nextFreeChannel] = channel;
// done
return _nextFreeChannel++;
}
@ -90,7 +90,7 @@ void ConnectionImpl::remove(ChannelImpl *channel)
/**
* Parse the buffer into a recognized frame
*
*
* Every time that data comes in on the connection, you should call this method to parse
* the incoming data, and let it handle by the AMQP library. This method returns the number
* of bytes that were processed.
@ -108,13 +108,13 @@ size_t ConnectionImpl::parse(const char *buffer, size_t size)
{
// do not parse if already in an error state
if (_state == state_closed) return 0;
// number of bytes processed
size_t processed = 0;
// create a monitor object that checks if the connection still exists
Monitor monitor(this);
// keep looping until we have processed all bytes, and the monitor still
// indicates that the connection is in a valid state
while (size > 0 && monitor.valid())
@ -131,7 +131,7 @@ size_t ConnectionImpl::parse(const char *buffer, size_t size)
// number of bytes processed
size_t bytes = receivedFrame.totalSize();
// add bytes
processed += bytes; size -= bytes; buffer += bytes;
}
@ -139,12 +139,12 @@ size_t ConnectionImpl::parse(const char *buffer, size_t size)
{
// something terrible happened on the protocol (like data out of range)
reportError(exception.what());
// done
return processed;
}
}
// done
return processed;
}
@ -161,13 +161,13 @@ bool ConnectionImpl::close()
// mark that the object is closed
_closed = true;
// if still busy with handshake, we delay closing for a while
if (_state == state_handshake || _state == state_protocol) return true;
// perform the close operation
sendClose();
// done
return true;
}
@ -181,26 +181,26 @@ bool ConnectionImpl::sendClose()
{
// after the send operation the object could be dead
Monitor monitor(this);
// loop over all channels
for (auto iter = _channels.begin(); iter != _channels.end(); iter++)
{
// close the channel
iter->second->close();
// we could be dead now
if (!monitor.valid()) return false;
}
// send the close frame
send(ConnectionCloseFrame(0, "shutdown"));
// leap out if object no longer is alive
if (!monitor.valid()) return false;
// we're in a new state
_state = state_closing;
// done
return true;
}
@ -214,20 +214,20 @@ void ConnectionImpl::setConnected()
_state = state_connected;
// if the close operation was already called, we do that again now again
// so that the actual messages to close down the connection and the channel
// so that the actual messages to close down the connection and the channel
// are appended to the queue
if (_closed && !sendClose()) return;
// we're going to call the handler, which can destruct the connection,
// so we must monitor if the queue object is still valid after calling
Monitor monitor(this);
// inform handler
_handler->onConnected(_parent);
// leap out if the connection no longer exists
if (!monitor.valid()) return;
// empty the queue of messages
while (!_queue.empty())
{
@ -236,10 +236,10 @@ void ConnectionImpl::setConnected()
// remove it from the queue
_queue.pop();
// send it
_handler->onData(_parent, buffer.data(), buffer.size());
// leap out if the connection was destructed
if (!monitor.valid()) return;
}
@ -254,16 +254,16 @@ bool ConnectionImpl::send(const Frame &frame)
{
// its not possible to send anything if closed or closing down
if (_state == state_closing || _state == state_closed) return false;
// we need an output buffer
OutBuffer buffer(frame.totalSize());
// fill the buffer
frame.fill(buffer);
// append an end of frame byte (but not when still negotiating the protocol)
if (frame.needsSeparator()) buffer.add((uint8_t)206);
// are we still setting up the connection?
if ((_state == state_connected && _queue.size() == 0) || frame.partOfHandshake())
{
@ -275,7 +275,7 @@ bool ConnectionImpl::send(const Frame &frame)
// the connection is still being set up, so we need to delay the message sending
_queue.push(std::move(buffer));
}
// done
return true;
}