added connection::maxFrame() and connection::expected() methods to give hints about the amount of data that the connection::parse() method ideally expects
This commit is contained in:
parent
7384b521ac
commit
59830c659c
|
|
@ -138,6 +138,34 @@ public:
|
||||||
return _implementation.parse(buffer);
|
return _implementation.parse(buffer);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Max frame size
|
||||||
|
*
|
||||||
|
* If you allocate memory to receive data that you are going to pass to the parse() method,
|
||||||
|
* it might be useful to have an insight in the max frame size. The parse() method process
|
||||||
|
* one frame at a time, so you must at least be able to read in buffers of this specific
|
||||||
|
* frame size.
|
||||||
|
*
|
||||||
|
* @return size_t
|
||||||
|
*/
|
||||||
|
uint32_t maxFrame() const
|
||||||
|
{
|
||||||
|
return _implementation.maxFrame();
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Expected number of bytes for the next parse() call.
|
||||||
|
*
|
||||||
|
* This method returns the number of bytes that the next call to parse() at least expects to
|
||||||
|
* do something meaningful with it.
|
||||||
|
*
|
||||||
|
* @return size_t
|
||||||
|
*/
|
||||||
|
uint32_t expected() const
|
||||||
|
{
|
||||||
|
return _implementation.expected();
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Close the connection
|
* Close the connection
|
||||||
* This will close all channels
|
* This will close all channels
|
||||||
|
|
|
||||||
|
|
@ -79,6 +79,13 @@ protected:
|
||||||
*/
|
*/
|
||||||
uint32_t _maxFrame = 10000;
|
uint32_t _maxFrame = 10000;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Number of expected bytes that will hold the next incoming frame
|
||||||
|
* We start with seven because that is the header of a frame
|
||||||
|
* @var uint32_t
|
||||||
|
*/
|
||||||
|
uint32_t _expected = 7;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* The login for the server (login, password)
|
* The login for the server (login, password)
|
||||||
* @var Login
|
* @var Login
|
||||||
|
|
@ -239,6 +246,15 @@ public:
|
||||||
return _maxFrame - 8;
|
return _maxFrame - 8;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* The number of bytes that can best be passed to the next call to the parse() method
|
||||||
|
* @return uint32_t
|
||||||
|
*/
|
||||||
|
uint32_t expected() const
|
||||||
|
{
|
||||||
|
return _expected;
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Add a channel to the connection, and return the channel ID that it
|
* Add a channel to the connection, and return the channel ID that it
|
||||||
* is allowed to use, or 0 when no more ID's are available
|
* is allowed to use, or 0 when no more ID's are available
|
||||||
|
|
|
||||||
|
|
@ -128,9 +128,16 @@ public:
|
||||||
*/
|
*/
|
||||||
virtual ~ReceivedFrame() {}
|
virtual ~ReceivedFrame() {}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Have we at least received the full frame header?
|
||||||
|
* The header contains the frame type, the channel ID and the payload size
|
||||||
|
* @return bool
|
||||||
|
*/
|
||||||
|
bool header() const;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Is this a complete frame?
|
* Is this a complete frame?
|
||||||
* @return integer
|
* @return bool
|
||||||
*/
|
*/
|
||||||
bool complete() const;
|
bool complete() const;
|
||||||
|
|
||||||
|
|
|
||||||
|
|
@ -124,8 +124,10 @@ uint64_t ConnectionImpl::parse(const Buffer &buffer)
|
||||||
{
|
{
|
||||||
// try to recognize the frame
|
// try to recognize the frame
|
||||||
ReceivedFrame receivedFrame(ReducedBuffer(buffer, processed), _maxFrame);
|
ReceivedFrame receivedFrame(ReducedBuffer(buffer, processed), _maxFrame);
|
||||||
if (!receivedFrame.complete()) return processed;
|
|
||||||
|
|
||||||
|
// do we have the full frame?
|
||||||
|
if (receivedFrame.complete())
|
||||||
|
{
|
||||||
// process the frame
|
// process the frame
|
||||||
receivedFrame.process(this);
|
receivedFrame.process(this);
|
||||||
|
|
||||||
|
|
@ -135,6 +137,18 @@ uint64_t ConnectionImpl::parse(const Buffer &buffer)
|
||||||
// add bytes
|
// add bytes
|
||||||
processed += bytes;
|
processed += bytes;
|
||||||
}
|
}
|
||||||
|
else
|
||||||
|
{
|
||||||
|
// we do not yet have the complete frame, but if we do at least
|
||||||
|
// have the initial bytes of the header, we already know how much
|
||||||
|
// data we need for the next frame, otherwise we need at least 7
|
||||||
|
// bytes for processing the header of the next frame
|
||||||
|
_expected = receivedFrame.header() ? receivedFrame.totalSize() : 7;
|
||||||
|
|
||||||
|
// we're ready for now
|
||||||
|
return processed;
|
||||||
|
}
|
||||||
|
}
|
||||||
catch (const ProtocolException &exception)
|
catch (const ProtocolException &exception)
|
||||||
{
|
{
|
||||||
// something terrible happened on the protocol (like data out of range)
|
// something terrible happened on the protocol (like data out of range)
|
||||||
|
|
@ -146,7 +160,14 @@ uint64_t ConnectionImpl::parse(const Buffer &buffer)
|
||||||
}
|
}
|
||||||
|
|
||||||
// leap out if the connection object no longer exists
|
// leap out if the connection object no longer exists
|
||||||
if (!monitor.valid() || !_closed || _state != state_connected) return processed;
|
if (!monitor.valid()) return processed;
|
||||||
|
|
||||||
|
// the entire buffer has been processed, the next call to parse() should at least
|
||||||
|
// contain the size of the frame header to be meaningful for the amqp-cpp library
|
||||||
|
_expected = 7;
|
||||||
|
|
||||||
|
// if the connection is being closed, we have to do more stuff, otherwise we're ready now
|
||||||
|
if (!_closed || _state != state_connected) return processed;
|
||||||
|
|
||||||
// the close() function was called, but if the close frame was not yet sent
|
// the close() function was called, but if the close frame was not yet sent
|
||||||
// if there are no waiting channels, we can do that right now
|
// if there are no waiting channels, we can do that right now
|
||||||
|
|
|
||||||
|
|
@ -78,6 +78,7 @@
|
||||||
* Set up namespace
|
* Set up namespace
|
||||||
*/
|
*/
|
||||||
namespace AMQP {
|
namespace AMQP {
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Constructor
|
* Constructor
|
||||||
* @param buffer Binary buffer
|
* @param buffer Binary buffer
|
||||||
|
|
@ -111,6 +112,15 @@ ReceivedFrame::ReceivedFrame(const Buffer &buffer, uint32_t max) : _buffer(buffe
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Have we received the header of the frame
|
||||||
|
* @return bool
|
||||||
|
*/
|
||||||
|
bool ReceivedFrame::header() const
|
||||||
|
{
|
||||||
|
return _buffer.size() >= 7;
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Is this a complete frame?
|
* Is this a complete frame?
|
||||||
* @return integer
|
* @return integer
|
||||||
|
|
|
||||||
Loading…
Reference in New Issue