added tcpinbuffer with much less allocation

This commit is contained in:
Emiel Bruijntjes 2016-06-15 13:32:30 -04:00
parent 6320e2ae0e
commit 24c8527b5d
5 changed files with 82 additions and 11 deletions

View File

@ -23,7 +23,7 @@ namespace AMQP {
*/ */
class ByteBuffer : public Buffer class ByteBuffer : public Buffer
{ {
private: protected:
/** /**
* The actual byte buffer * The actual byte buffer
* @var const char * * @var const char *
@ -44,11 +44,49 @@ public:
*/ */
ByteBuffer(const char *data, size_t size) : _data(data), _size(size) {} ByteBuffer(const char *data, size_t size) : _data(data), _size(size) {}
/**
* No copy'ing
* @param that
*/
ByteBuffer(const ByteBuffer &that) = delete;
/**
* Move constructor
* @param that
*/
ByteBuffer(ByteBuffer &&that) : _data(that._data), _size(that._size)
{
// reset other object
that._data = nullptr;
that._size = 0;
}
/** /**
* Destructor * Destructor
*/ */
virtual ~ByteBuffer() {} virtual ~ByteBuffer() {}
/**
* Move assignment operator
* @param that
*/
ByteBuffer &operator=(ByteBuffer &&that)
{
// skip self-assignment
if (this == &that) return *this;
// copy members
_data = that._data;
_size = that._size;
// reset other object
that._data = nullptr;
that._size = 0;
// done
return *this;
}
/** /**
* Total size of the buffer * Total size of the buffer
* @return size_t * @return size_t

View File

@ -77,7 +77,7 @@ protected:
* Max frame size * Max frame size
* @var uint32_t * @var uint32_t
*/ */
uint32_t _maxFrame = 10000; uint32_t _maxFrame = 4096;
/** /**
* Number of expected bytes that will hold the next incoming frame * Number of expected bytes that will hold the next incoming frame

View File

@ -5,7 +5,7 @@
* IO between the client application and the RabbitMQ server. * IO between the client application and the RabbitMQ server.
* *
* @author Emiel Bruijntjes <emiel.bruijntjes@copernica.com> * @author Emiel Bruijntjes <emiel.bruijntjes@copernica.com>
* @copyright 2015 Copernica BV * @copyright 2015 - 2016 Copernica BV
*/ */
/** /**
@ -141,6 +141,24 @@ public:
return _connection.close(); return _connection.close();
} }
/**
* The max frame size
* @return uint32_t
*/
uint32_t maxFrame() const
{
return _connection.maxFrame();
}
/**
* The number of bytes that can best be passed to the next call to the parse() method
* @return uint32_t
*/
uint32_t expected() const
{
return _connection.expected();
}
/** /**
* Return the amount of channels this connection has * Return the amount of channels this connection has
* @return std::size_t * @return std::size_t

View File

@ -5,7 +5,7 @@
* the hostname was resolved into an IP address * the hostname was resolved into an IP address
* *
* @author Emiel Bruijntjes <emiel.bruijntjes@copernica.com> * @author Emiel Bruijntjes <emiel.bruijntjes@copernica.com>
* @copyright 2015 Copernica BV * @copyright 2015 - 2016 Copernica BV
*/ */
/** /**
@ -17,6 +17,7 @@
* Dependencies * Dependencies
*/ */
#include "tcpbuffer.h" #include "tcpbuffer.h"
#include "tcpinbuffer.h"
/** /**
* Set up namespace * Set up namespace
@ -43,10 +44,9 @@ private:
/** /**
* An incoming buffer * An incoming buffer
* @var TcpBuffer * @var TcpInBuffer
*/ */
TcpBuffer _in; TcpInBuffer _in;
/** /**
* Helper method to report an error * Helper method to report an error
@ -88,7 +88,8 @@ public:
TcpConnected(TcpConnection *connection, int socket, TcpBuffer &&buffer, TcpHandler *handler) : TcpConnected(TcpConnection *connection, int socket, TcpBuffer &&buffer, TcpHandler *handler) :
TcpState(connection, handler), TcpState(connection, handler),
_socket(socket), _socket(socket),
_out(std::move(buffer)) _out(std::move(buffer)),
_in(4096)
{ {
// if there is already an output buffer, we have to send out that first // if there is already an output buffer, we have to send out that first
if (_out) _out.sendto(_socket); if (_out) _out.sendto(_socket);
@ -141,14 +142,14 @@ public:
if (flags & readable) if (flags & readable)
{ {
// read data from buffer // read data from buffer
ssize_t result = _in.receivefrom(_socket); ssize_t result = _in.receivefrom(_socket, _connection->expected());
// are we in an error state? // are we in an error state?
if (result < 0 && reportError()) return nextState(monitor); if (result < 0 && reportError()) return nextState(monitor);
// we need a local copy of the buffer - because it is possible that "this" // we need a local copy of the buffer - because it is possible that "this"
// object gets destructed halfway through the call to the parse() method // object gets destructed halfway through the call to the parse() method
TcpBuffer buffer(std::move(_in)); TcpInBuffer buffer(std::move(_in));
// parse the buffer // parse the buffer
auto processed = _connection->parse(buffer); auto processed = _connection->parse(buffer);
@ -192,6 +193,20 @@ public:
// start monitoring the socket to find out when it is writable // start monitoring the socket to find out when it is writable
_handler->monitor(_connection, _socket, readable | writable); _handler->monitor(_connection, _socket, readable | writable);
} }
/**
* Report that heartbeat negotiation is going on
* @param heartbeat suggested heartbeat
* @return uint16_t accepted heartbeat
*/
virtual uint16_t reportNegotiate(uint16_t heartbeat) override
{
// allocate a buffer that is big enough for the biggest possible frame
_in.reallocate(_connection->maxFrame());
// pass to base
return TcpState::reportNegotiate(heartbeat);
}
}; };
/** /**

View File

@ -84,7 +84,7 @@ public:
* @param heartbeat suggested heartbeat * @param heartbeat suggested heartbeat
* @return uint16_t accepted heartbeat * @return uint16_t accepted heartbeat
*/ */
uint16_t reportNegotiate(uint16_t heartbeat) virtual uint16_t reportNegotiate(uint16_t heartbeat)
{ {
// pass to handler // pass to handler
return _handler->onNegotiate(_connection, heartbeat); return _handler->onNegotiate(_connection, heartbeat);