From 24c8527b5d7cf6447d714140ff8fa2ac12bf5d26 Mon Sep 17 00:00:00 2001 From: Emiel Bruijntjes Date: Wed, 15 Jun 2016 13:32:30 -0400 Subject: [PATCH] added tcpinbuffer with much less allocation --- include/bytebuffer.h | 40 +++++++++++++++++++++++++++++++++++++++- include/connectionimpl.h | 2 +- include/tcpconnection.h | 20 +++++++++++++++++++- src/tcpconnected.h | 29 ++++++++++++++++++++++------- src/tcpstate.h | 2 +- 5 files changed, 82 insertions(+), 11 deletions(-) diff --git a/include/bytebuffer.h b/include/bytebuffer.h index 75c8776..0a9964b 100644 --- a/include/bytebuffer.h +++ b/include/bytebuffer.h @@ -23,7 +23,7 @@ namespace AMQP { */ class ByteBuffer : public Buffer { -private: +protected: /** * The actual byte buffer * @var const char * @@ -44,11 +44,49 @@ public: */ ByteBuffer(const char *data, size_t size) : _data(data), _size(size) {} + /** + * No copy'ing + * @param that + */ + ByteBuffer(const ByteBuffer &that) = delete; + + /** + * Move constructor + * @param that + */ + ByteBuffer(ByteBuffer &&that) : _data(that._data), _size(that._size) + { + // reset other object + that._data = nullptr; + that._size = 0; + } + /** * Destructor */ virtual ~ByteBuffer() {} + /** + * Move assignment operator + * @param that + */ + ByteBuffer &operator=(ByteBuffer &&that) + { + // skip self-assignment + if (this == &that) return *this; + + // copy members + _data = that._data; + _size = that._size; + + // reset other object + that._data = nullptr; + that._size = 0; + + // done + return *this; + } + /** * Total size of the buffer * @return size_t diff --git a/include/connectionimpl.h b/include/connectionimpl.h index 13a8df0..7366264 100644 --- a/include/connectionimpl.h +++ b/include/connectionimpl.h @@ -77,7 +77,7 @@ protected: * Max frame size * @var uint32_t */ - uint32_t _maxFrame = 10000; + uint32_t _maxFrame = 4096; /** * Number of expected bytes that will hold the next incoming frame diff --git a/include/tcpconnection.h b/include/tcpconnection.h index cfbf2ee..b0ebd19 100644 --- a/include/tcpconnection.h +++ b/include/tcpconnection.h @@ -5,7 +5,7 @@ * IO between the client application and the RabbitMQ server. * * @author Emiel Bruijntjes - * @copyright 2015 Copernica BV + * @copyright 2015 - 2016 Copernica BV */ /** @@ -141,6 +141,24 @@ public: return _connection.close(); } + /** + * The max frame size + * @return uint32_t + */ + uint32_t maxFrame() const + { + return _connection.maxFrame(); + } + + /** + * The number of bytes that can best be passed to the next call to the parse() method + * @return uint32_t + */ + uint32_t expected() const + { + return _connection.expected(); + } + /** * Return the amount of channels this connection has * @return std::size_t diff --git a/src/tcpconnected.h b/src/tcpconnected.h index 873505f..1f4c7b3 100644 --- a/src/tcpconnected.h +++ b/src/tcpconnected.h @@ -5,7 +5,7 @@ * the hostname was resolved into an IP address * * @author Emiel Bruijntjes - * @copyright 2015 Copernica BV + * @copyright 2015 - 2016 Copernica BV */ /** @@ -17,6 +17,7 @@ * Dependencies */ #include "tcpbuffer.h" +#include "tcpinbuffer.h" /** * Set up namespace @@ -43,10 +44,9 @@ private: /** * An incoming buffer - * @var TcpBuffer + * @var TcpInBuffer */ - TcpBuffer _in; - + TcpInBuffer _in; /** * Helper method to report an error @@ -88,7 +88,8 @@ public: TcpConnected(TcpConnection *connection, int socket, TcpBuffer &&buffer, TcpHandler *handler) : TcpState(connection, handler), _socket(socket), - _out(std::move(buffer)) + _out(std::move(buffer)), + _in(4096) { // if there is already an output buffer, we have to send out that first if (_out) _out.sendto(_socket); @@ -141,14 +142,14 @@ public: if (flags & readable) { // read data from buffer - ssize_t result = _in.receivefrom(_socket); + ssize_t result = _in.receivefrom(_socket, _connection->expected()); // are we in an error state? if (result < 0 && reportError()) return nextState(monitor); // we need a local copy of the buffer - because it is possible that "this" // object gets destructed halfway through the call to the parse() method - TcpBuffer buffer(std::move(_in)); + TcpInBuffer buffer(std::move(_in)); // parse the buffer auto processed = _connection->parse(buffer); @@ -192,6 +193,20 @@ public: // start monitoring the socket to find out when it is writable _handler->monitor(_connection, _socket, readable | writable); } + + /** + * Report that heartbeat negotiation is going on + * @param heartbeat suggested heartbeat + * @return uint16_t accepted heartbeat + */ + virtual uint16_t reportNegotiate(uint16_t heartbeat) override + { + // allocate a buffer that is big enough for the biggest possible frame + _in.reallocate(_connection->maxFrame()); + + // pass to base + return TcpState::reportNegotiate(heartbeat); + } }; /** diff --git a/src/tcpstate.h b/src/tcpstate.h index e8ee827..bb81759 100644 --- a/src/tcpstate.h +++ b/src/tcpstate.h @@ -84,7 +84,7 @@ public: * @param heartbeat suggested heartbeat * @return uint16_t accepted heartbeat */ - uint16_t reportNegotiate(uint16_t heartbeat) + virtual uint16_t reportNegotiate(uint16_t heartbeat) { // pass to handler return _handler->onNegotiate(_connection, heartbeat);