Implemented buffering of outgoing messages before the connection is set up, so that it is possible to create channel objects and call AMQP methods even while the connection handshake is still in progress

This commit is contained in:
Emiel Bruijntjes 2014-01-04 11:34:36 -08:00
parent 158f58a15f
commit e5b705b742
16 changed files with 173 additions and 97 deletions

View File

@ -33,7 +33,7 @@ with a number of methods that are called by the library every time it wants
to send out data, or when it needs to inform you that an error occured. to send out data, or when it needs to inform you that an error occured.
````c++ ````c++
#include <amqp.h> #include <libamqp.h>
class MyConnectionHandler : public AMQP::ConnectionHandler class MyConnectionHandler : public AMQP::ConnectionHandler
{ {
@ -178,7 +178,7 @@ so you do not have to implement all of them - only the ones that you are interes
in. in.
````c++ ````c++
#include <amqp.h> #include <libamqp.h>
class MyChannelHandler : public AMQP::ChannelHandler class MyChannelHandler : public AMQP::ChannelHandler
{ {

1
amqp
View File

@ -1 +0,0 @@
include

68
amqp.h
View File

@ -1,68 +0,0 @@
/**
* AMQP.h
*
* Starting point for all includes of the Copernica AMQP library
*
* @documentation public
*/
// base C++ include files
#include <vector>
#include <cstring>
#include <sstream>
#include <string>
#include <limits>
#include <memory>
#include <type_traits>
#include <cstdlib>
#include <map>
// base C include files
#include <stdint.h>
#include <math.h>
#include <stddef.h>
#include <endian.h>
#include <string.h>
// other libraries
#include <copernica/event.h>
#include <copernica/network.h>
// forward declarations
#include <copernica/amqp/classes.h>
// utility classes
#include <copernica/amqp/receivedframe.h>
#include <copernica/amqp/outbuffer.h>
#include <copernica/amqp/watchable.h>
#include <copernica/amqp/monitor.h>
// amqp types
#include <copernica/amqp/field.h>
#include <copernica/amqp/numericfield.h>
#include <copernica/amqp/decimalfield.h>
#include <copernica/amqp/stringfield.h>
#include <copernica/amqp/booleanset.h>
#include <copernica/amqp/fieldproxy.h>
#include <copernica/amqp/table.h>
#include <copernica/amqp/array.h>
// envelope for publishing and consuming
#include <copernica/amqp/envelopefield.h>
#include <copernica/amqp/envelope.h>
// mid level includes
#include <copernica/amqp/exchangetype.h>
#include <copernica/amqp/flags.h>
#include <copernica/amqp/channelhandler.h>
#include <copernica/amqp/channelimpl.h>
#include <copernica/amqp/channel.h>
#include <copernica/amqp/login.h>
#include <copernica/amqp/connectionhandler.h>
#include <copernica/amqp/connectionimpl.h>
#include <copernica/amqp/connection.h>

View File

@ -79,6 +79,12 @@ protected:
* @var string * @var string
*/ */
std::string _vhost; std::string _vhost;
/**
* Queued messages that should be sent after the connection has been established
* @var queue
*/
std::queue<std::string> _queue;
private: private:
@ -135,14 +141,7 @@ public:
/** /**
* Mark the connection as connected * Mark the connection as connected
*/ */
void setConnected() void setConnected();
{
// store connected state
_state = state_connected;
// inform handler
_handler->onConnected(_parent);
}
/** /**
* Retrieve the login data * Retrieve the login data

58
libamqp.h Normal file
View File

@ -0,0 +1,58 @@
/**
* AMQP.h
*
* Starting point for all includes of the Copernica AMQP library
*
* @documentation public
*/
// base C++ include files
#include <vector>
#include <string>
#include <memory>
#include <map>
#include <queue>
#include <set>
#include <limits>
#include <cstddef>
#include <cstring>
// base C include files
#include <stdint.h>
#include <math.h>
#include <endian.h>
// forward declarations
#include <libamqp/classes.h>
// utility classes
#include <libamqp/receivedframe.h>
#include <libamqp/outbuffer.h>
#include <libamqp/watchable.h>
#include <libamqp/monitor.h>
// amqp types
#include <libamqp/field.h>
#include <libamqp/numericfield.h>
#include <libamqp/decimalfield.h>
#include <libamqp/stringfield.h>
#include <libamqp/booleanset.h>
#include <libamqp/fieldproxy.h>
#include <libamqp/table.h>
#include <libamqp/array.h>
// envelope for publishing and consuming
#include <libamqp/envelopefield.h>
#include <libamqp/envelope.h>
// mid level includes
#include <libamqp/exchangetype.h>
#include <libamqp/flags.h>
#include <libamqp/channelhandler.h>
#include <libamqp/channelimpl.h>
#include <libamqp/channel.h>
#include <libamqp/login.h>
#include <libamqp/connectionhandler.h>
#include <libamqp/connectionimpl.h>
#include <libamqp/connection.h>

View File

@ -173,6 +173,42 @@ bool ConnectionImpl::close()
return true; return true;
} }
/**
* Mark the connection as connected
*/
void ConnectionImpl::setConnected()
{
// store connected state
_state = state_connected;
// we're going to call the handler, which can destruct the connection,
// so we must monitor if the queue object is still valid after calling the
// handler
Monitor monitor(this);
// inform handler
_handler->onConnected(_parent);
// leap out if the connection no longer exists
if (!monitor.valid()) return;
// empty the queue of messages
while (!_queue.empty())
{
// get the next message
std::string message(_queue.front());
// remove it from the queue
_queue.pop();
// send it
_handler->onData(_parent, message.data(), message.size());
// leap out if the connection was destructed
if (!monitor.valid()) return;
}
}
/** /**
* Send a frame over the connection * Send a frame over the connection
* @param frame The frame to send * @param frame The frame to send
@ -180,8 +216,6 @@ bool ConnectionImpl::close()
*/ */
size_t ConnectionImpl::send(const Frame &frame) size_t ConnectionImpl::send(const Frame &frame)
{ {
std::cout << "send frame of " << frame.totalSize() << " bytes" << std::endl;
// we need an output buffer // we need an output buffer
OutBuffer buffer(frame.totalSize()); OutBuffer buffer(frame.totalSize());
@ -191,8 +225,17 @@ size_t ConnectionImpl::send(const Frame &frame)
// append an end of frame byte (but not when still negotiating the protocol) // append an end of frame byte (but not when still negotiating the protocol)
if (_state != state_protocol) buffer.add((uint8_t)206); if (_state != state_protocol) buffer.add((uint8_t)206);
// send the buffer // are we still setting up the connection?
_handler->onData(_parent, buffer.data(), buffer.size()); if ((_state == state_protocol || _state == state_handshake) && !frame.partOfHandshake())
{
// the connection is still being set up, so we need to delay the message sending
_queue.push(std::string(buffer.data(), buffer.size()));
}
else
{
// send the buffer
_handler->onData(_parent, buffer.data(), buffer.size());
}
// done // done
return buffer.size(); return buffer.size();

View File

@ -99,6 +99,15 @@ public:
{ {
return _vhost; return _vhost;
} }
/**
* Is this a frame that is part of the connection setup?
* @return bool
*/
virtual bool partOfHandshake() const override
{
return true;
}
}; };
/** /**

View File

@ -22,8 +22,6 @@ bool ConnectionStartFrame::process(ConnectionImpl *connection)
{ {
// @todo we must still be in protocol handshake mode // @todo we must still be in protocol handshake mode
// move connection to handshake mode
connection->setProtocolOk();
// the peer properties // the peer properties
Table properties; Table properties;
@ -35,11 +33,11 @@ bool ConnectionStartFrame::process(ConnectionImpl *connection)
properties["copyright"] = "Copyright 2014 Copernica BV"; properties["copyright"] = "Copyright 2014 Copernica BV";
properties["information"] = ""; properties["information"] = "";
// the start ok frame we'd like to send back // move connection to handshake mode
ConnectionStartOKFrame frame(properties, "PLAIN", connection->login().saslPlain(), "en_US"); connection->setProtocolOk();
// send back a connection start ok frame // send back a connection start ok frame
connection->send(frame); connection->send(ConnectionStartOKFrame(properties, "PLAIN", connection->login().saslPlain(), "en_US"));
// done // done
return true; return true;

View File

@ -139,6 +139,16 @@ public:
{ {
return _locale; return _locale;
} }
/**
* Is this a frame that is part of the connection setup?
* @return bool
*/
virtual bool partOfHandshake() const override
{
return true;
}
}; };
/** /**

View File

@ -26,17 +26,17 @@ bool ConnectionTuneFrame::process(ConnectionImpl *connection)
// remember this in the connection // remember this in the connection
connection->setCapacity(channelMax(), frameMax()); connection->setCapacity(channelMax(), frameMax());
// send a tune-ok frame back // theoretically it is possible that the connection object gets destructed between sending the messages
ConnectionTuneOKFrame okframe(channelMax(), frameMax(), heartbeat()); Monitor monitor(connection);
// send it back // send it back
connection->send(okframe); connection->send(ConnectionTuneOKFrame(channelMax(), frameMax(), heartbeat()));
// check if the connection object still exists
if (!monitor.valid()) return true;
// and finally we start to open the frame // and finally we start to open the frame
ConnectionOpenFrame openframe(connection->vhost()); connection->send(ConnectionOpenFrame(connection->vhost()));
// send the open frame
connection->send(openframe);
// done // done
return true; return true;

View File

@ -116,6 +116,15 @@ public:
{ {
return _heartbeat; return _heartbeat;
} }
/**
* Is this a frame that is part of the connection setup?
* @return bool
*/
virtual bool partOfHandshake() const override
{
return true;
}
}; };
/** /**

View File

@ -1 +0,0 @@
..

View File

@ -42,6 +42,15 @@ public:
*/ */
virtual void fill(OutBuffer &buffer) const = 0; virtual void fill(OutBuffer &buffer) const = 0;
/**
* Is this a frame that is part of the connection setup?
* @return bool
*/
virtual bool partOfHandshake() const
{
return false;
}
/** /**
* Process the frame * Process the frame
* @param connection The connection over which it was received * @param connection The connection over which it was received

View File

@ -8,7 +8,7 @@
*/ */
// include the generic amqp functions // include the generic amqp functions
#include <copernica/amqp.h> #include "../libamqp.h"
// classes that are very commonly used // classes that are very commonly used
#include "frame.h" #include "frame.h"

1
src/libamqp Symbolic link
View File

@ -0,0 +1 @@
../include

View File

@ -123,6 +123,16 @@ public:
{ {
return _revision; return _revision;
} }
/**
* Is this a frame that is part of the connection setup?
* @return bool
*/
virtual bool partOfHandshake() const override
{
return true;
}
}; };
/** /**