we now use passthrough buffers, so we no longer have to dynamically allocate a buffer for each and every outgoing message

This commit is contained in:
Emiel Bruijntjes 2017-03-09 00:18:53 +01:00
parent e59d0ff387
commit 1f5f641d8b
10 changed files with 355 additions and 170 deletions

View File

@ -19,7 +19,7 @@
#include "exchangetype.h"
#include "watchable.h"
#include "callbacks.h"
#include "outbuffer.h"
#include "copiedbuffer.h"
#include "deferred.h"
#include "monitor.h"
#include <memory>
@ -119,7 +119,7 @@ private:
*
* @var std::queue
*/
std::queue<std::pair<bool, OutBuffer>> _queue;
std::queue<std::pair<bool, CopiedBuffer>> _queue;
/**
* Are we currently operating in synchronous mode?

View File

@ -29,6 +29,7 @@ class Channel;
class Connection;
class ConnectionHandler;
class ConnectionImpl;
class CopiedBuffer;
class Exchange;
class Frame;
class Login;

View File

@ -19,7 +19,7 @@
#include "watchable.h"
#include "connectionhandler.h"
#include "channelimpl.h"
#include "outbuffer.h"
#include "copiedbuffer.h"
#include "monitor.h"
#include "login.h"
#include <unordered_map>
@ -122,7 +122,7 @@ protected:
* Queued messages that should be sent after the connection has been established
* @var queue
*/
std::queue<OutBuffer> _queue;
std::queue<CopiedBuffer> _queue;
/**
* Heartbeat delay
@ -348,7 +348,7 @@ public:
*
* @param buffer the buffer with data to send
*/
bool send(OutBuffer &&buffer);
bool send(const CopiedBuffer &buffer);
/**
* Get a channel by its identifier

145
include/copiedbuffer.h Normal file
View File

@ -0,0 +1,145 @@
/**
* CopiedBuffer.h
*
* If an output buffer (frame) cannot immediately be sent, we copy it to
* memory using this CopiedBuffer class
*
* @copyright 2017 Copernica BV
*/
/**
* Include guard
*/
#pragma once
/**
* Dependencies
*/
#include <memory>
#include <cstring>
#include "endian.h"
#include "frame.h"
/**
* Set up namespace
*/
namespace AMQP {
/**
* Class definition
*/
class CopiedBuffer : public OutBuffer
{
private:
/**
* The total capacity of the out buffer
* @var size_t
*/
size_t _capacity;
/**
* Pointer to the beginning of the buffer
* @var const char *
*/
char *_buffer;
/**
* Current size of the buffer
* @var size_t
*/
size_t _size = 0;
protected:
/**
* The method that adds the actual data
* @param data
* @param size
*/
virtual void append(const void *data, size_t size) override
{
// copy into the buffer
memcpy(_buffer + _size, data, size);
// update the size
_size += size;
}
public:
/**
* Constructor
* @param frame
*/
CopiedBuffer(const Frame &frame) :
_capacity(frame.totalSize()),
_buffer((char *)malloc(_capacity))
{
// tell the frame to fill this buffer
frame.fill(*this);
// append an end of frame byte (but not when still negotiating the protocol)
if (frame.needsSeparator()) add((uint8_t)206);
}
/**
* Copy constructor
* @param that
*/
CopiedBuffer(const CopiedBuffer &that) :
_capacity(that._capacity),
_buffer((char *)malloc(_capacity)),
_size(that._size)
{
// copy the data
memcpy(_buffer, that._buffer, _size);
}
/**
* Move constructor
* @param that
*/
CopiedBuffer(CopiedBuffer &&that) :
_capacity(that._capacity),
_buffer(that._buffer),
_size(that._size)
{
// reset the other object
that._buffer = nullptr;
that._size = 0;
that._capacity = 0;
}
/**
* Destructor
*/
virtual ~CopiedBuffer()
{
// deallocate the buffer
free(_buffer);
}
/**
* Get access to the internal buffer
* @return const char*
*/
const char *data() const
{
// expose member
return _buffer;
}
/**
* Current size of the output buffer
* @return size_t
*/
size_t size() const
{
// expose member
return _size;
}
};
/**
* End of namespace
*/
}

View File

@ -4,7 +4,7 @@
* This is a utility class for writing various data types to a binary
* string, and converting the values to network byte order
*
* @copyright 2014 Copernica BV
* @copyright 2014 - 2017 Copernica BV
*/
/**
@ -18,6 +18,7 @@
#include <memory>
#include <cstring>
#include "endian.h"
#include "frame.h"
/**
* Set up namespace
@ -29,97 +30,15 @@ namespace AMQP {
*/
class OutBuffer
{
private:
protected:
/**
* Pointer to the beginning of the buffer
* @var std::unique_ptr<char[]>
* The method that adds the actual data
* @param data
* @param size
*/
std::unique_ptr<char[]> _buffer;
/**
* Pointer to the buffer to be filled
* @var char*
*/
char *_current;
/**
* Current size of the buffer
* @var size_t
*/
size_t _size;
/**
* The total capacity of the out buffer
* @var size_t
*/
size_t _capacity;
virtual void append(const void *data, size_t size) = 0;
public:
/**
* Constructor
* @param capacity
*/
OutBuffer(uint32_t capacity) :
_buffer(new char[capacity]),
_current(_buffer.get()),
_size(0),
_capacity(capacity)
{}
/**
* Copy constructor
* @param that
*/
OutBuffer(const OutBuffer &that) :
_buffer(new char[that._capacity]),
_current(_buffer.get() + that._size),
_size(that._size),
_capacity(that._capacity)
{
// copy memory
memcpy(_buffer.get(), that._buffer.get(), _size);
}
/**
* Move constructor
* @param that
*/
OutBuffer(OutBuffer &&that) :
_buffer(std::move(that._buffer)),
_current(that._current),
_size(that._size),
_capacity(that._capacity)
{
// reset the other object
that._size = 0;
that._capacity = 0;
that._current = nullptr;
}
/**
* Destructor
*/
virtual ~OutBuffer() = default;
/**
* Get access to the internal buffer
* @return const char*
*/
const char *data() const
{
return _buffer.get();
}
/**
* Current size of the output buffer
* @return size_t
*/
size_t size() const
{
return _size;
}
/**
* Add a binary buffer to the buffer
* @param string char* to the string
@ -127,9 +46,8 @@ public:
*/
void add(const char *string, uint32_t size)
{
memcpy(_current, string, size);
_current += size;
_size += size;
// append data
append(string, size);
}
/**
@ -139,7 +57,8 @@ public:
*/
void add(const std::string &string)
{
add(string.c_str(), string.size());
// add data
append(string.c_str(), string.size());
}
/**
@ -148,9 +67,8 @@ public:
*/
void add(uint8_t value)
{
memcpy(_current, &value, sizeof(value));
_current += sizeof(value);
_size += sizeof(value);
// append one byte
append(&value, sizeof(value));
}
/**
@ -159,10 +77,11 @@ public:
*/
void add(uint16_t value)
{
// convert to network byte order
uint16_t v = htobe16(value);
memcpy(_current, &v, sizeof(v));
_current += sizeof(v);
_size += sizeof(v);
// append the data
append(&v, sizeof(v));
}
/**
@ -171,10 +90,11 @@ public:
*/
void add(uint32_t value)
{
// convert to network byte order
uint32_t v = htobe32(value);
memcpy(_current, &v, sizeof(v));
_current += sizeof(v);
_size += sizeof(v);
// append the data
append(&v, sizeof(v));
}
/**
@ -183,10 +103,11 @@ public:
*/
void add(uint64_t value)
{
// convert to network byte order
uint64_t v = htobe64(value);
memcpy(_current, &v, sizeof(v));
_current += sizeof(v);
_size += sizeof(v);
// append the data
append(&v, sizeof(v));
}
/**
@ -195,9 +116,8 @@ public:
*/
void add(int8_t value)
{
memcpy(_current, &value, sizeof(value));
_current += sizeof(value);
_size += sizeof(value);
// append the data
append(&value, sizeof(value));
}
/**
@ -206,10 +126,11 @@ public:
*/
void add(int16_t value)
{
// convert to network byte order
int16_t v = htobe16(value);
memcpy(_current, &v, sizeof(v));
_current += sizeof(v);
_size += sizeof(v);
// append the data
append(&v, sizeof(v));
}
/**
@ -218,10 +139,11 @@ public:
*/
void add(int32_t value)
{
// convert into network byte order
int32_t v = htobe32(value);
memcpy(_current, &v, sizeof(v));
_current += sizeof(v);
_size += sizeof(v);
// append the data
append(&v, sizeof(v));
}
/**
@ -230,10 +152,11 @@ public:
*/
void add(int64_t value)
{
// copy into the buffer
int64_t v = htobe64(value);
memcpy(_current, &v, sizeof(v));
_current += sizeof(v);
_size += sizeof(v);
// append the data
append(&v, sizeof(v));
}
/**
@ -242,9 +165,8 @@ public:
*/
void add(float value)
{
memcpy(_current, &value, sizeof(value));
_current += sizeof(value);
_size += sizeof(value);
// append the data
append(&value, sizeof(value));
}
/**
@ -253,9 +175,8 @@ public:
*/
void add(double value)
{
memcpy(_current, &value, sizeof(value));
_current += sizeof(value);
_size += sizeof(value);
// append the data
append(&value, sizeof(value));
}
};

View File

@ -694,7 +694,7 @@ bool ChannelImpl::send(const Frame &frame)
{
// we need to wait until the synchronous frame has
// been processed, so queue the frame until it was
_queue.emplace(frame.synchronous(), frame.buffer());
_queue.emplace(frame.synchronous(), frame);
// it was of course not actually sent but we pretend
// that it was, because no error occured
@ -724,19 +724,22 @@ void ChannelImpl::onSynchronized()
Monitor monitor(this);
// send all frames while not in synchronous mode
while (monitor.valid() && _connection && !_synchronous && !_queue.empty())
while (_connection && !_synchronous && !_queue.empty())
{
// retrieve the first buffer and synchronous
auto pair = std::move(_queue.front());
// remove from the list
_queue.pop();
const auto &pair = _queue.front();
// mark as synchronous if necessary
_synchronous = pair.first;
// send it over the connection
_connection->send(std::move(pair.second));
_connection->send(pair.second);
// the user space handler may have destructed the channel
if (!monitor.valid()) return;
// remove from the list
_queue.pop();
}
}

View File

@ -10,6 +10,7 @@
#include "connectioncloseokframe.h"
#include "connectioncloseframe.h"
#include "reducedbuffer.h"
#include "passthroughbuffer.h"
/**
* set namespace
@ -257,22 +258,25 @@ void ConnectionImpl::setConnected()
// inform handler
_handler->onConnected(_parent);
// the handler could have destructed us
if (!monitor.valid()) return;
// empty the queue of messages
while (monitor.valid() && !_queue.empty())
while (!_queue.empty())
{
// get the next message
OutBuffer buffer(std::move(_queue.front()));
// remove it from the queue
_queue.pop();
const auto &buffer = _queue.front();
// send it
_handler->onData(_parent, buffer.data(), buffer.size());
}
// stop if monitor is gone
if (!monitor.valid()) return;
// leap out if object is dead
if (!monitor.valid()) return;
// remove it from the queue
_queue.pop();
}
// if the close method was called before, and no channel is waiting
// for an answer, we can now safely send out the close frame
@ -328,19 +332,16 @@ bool ConnectionImpl::send(const Frame &frame)
// it is impossible to send out this frame successfully
if (frame.totalSize() > _maxFrame) return false;
// we need an output buffer
OutBuffer buffer(frame.buffer());
// are we still setting up the connection?
if ((_state == state_connected && _queue.empty()) || frame.partOfHandshake())
{
// send the buffer
_handler->onData(_parent, buffer.data(), buffer.size());
// we need an output buffer (this will immediately send the data)
PassthroughBuffer buffer(_parent, _handler, frame);
}
else
{
// the connection is still being set up, so we need to delay the message sending
_queue.push(std::move(buffer));
_queue.emplace(frame);
}
// done
@ -352,7 +353,7 @@ bool ConnectionImpl::send(const Frame &frame)
*
* @param buffer the buffer with data to send
*/
bool ConnectionImpl::send(OutBuffer &&buffer)
bool ConnectionImpl::send(const CopiedBuffer &buffer)
{
// this only works when we are already connected
if (_state != state_connected) return false;
@ -366,7 +367,7 @@ bool ConnectionImpl::send(OutBuffer &&buffer)
else
{
// add to the list of waiting buffers
_queue.push(std::move(buffer));
_queue.push(buffer);
}
// done

View File

@ -15,7 +15,6 @@
/**
* Dependencies
*/
#include "../include/outbuffer.h"
#include "protocolexception.h"
/**
@ -84,25 +83,6 @@ public:
*/
virtual bool synchronous() const { return false; }
/**
* Retrieve the buffer in AMQP wire-format for
* sending over the socket connection
*/
OutBuffer buffer() const
{
// we need an output buffer
OutBuffer buffer(totalSize());
// fill the buffer
fill(buffer);
// append an end of frame byte (but not when still negotiating the protocol)
if (needsSeparator()) buffer.add((uint8_t)206);
// return the created buffer
return buffer;
}
/**
* Process the frame
* @param connection The connection over which it was received

View File

@ -38,6 +38,7 @@
#include "../include/bytebuffer.h"
#include "../include/receivedframe.h"
#include "../include/outbuffer.h"
#include "../include/copiedbuffer.h"
#include "../include/watchable.h"
#include "../include/monitor.h"
#include "../include/tcpdefines.h"

133
src/passthroughbuffer.h Normal file
View File

@ -0,0 +1,133 @@
/**
* PassthroughBuffer.h
*
* If we can immediately pass on data to the TCP layer, we use a passthrough
* buffer so that we do not have to dynamically allocate memory
*
* @copyright 2017 Copernica BV
*/
/**
* Include guard
*/
#pragma once
/**
* Dependencies
*/
#include <memory>
#include <cstring>
#include "endian.h"
#include "frame.h"
/**
* Set up namespace
*/
namespace AMQP {
/**
* Class definition
*/
class PassthroughBuffer : public OutBuffer
{
private:
/**
* The actual buffer
* @var const char *
*/
char _buffer[4096];
/**
* Current size of the buffer
* @var size_t
*/
size_t _size = 0;
/**
* Connection object (needs to be passed to the handler)
* @var Connection
*/
Connection *_connection;
/**
* Object that will send the data when the buffer is full
* @var ConnectionHandler
*/
ConnectionHandler *_handler;
/**
* Flush the object
*/
void flush()
{
// notify the handler
_handler->onData(_connection, _buffer, _size);
// all data has been sent
_size = 0;
}
protected:
/**
* The method that adds the actual data
* @param data
* @param size
*/
virtual void append(const void *data, size_t size) override
{
// flush existing buffers if data would not fit
if (_size > 0 && _size + size > 4096) flush();
// if data would not fit anyway, we send it immediately
if (size > 4096) return _handler->onData(_connection, (const char *)data, size);
// copy data into the buffer
memcpy(_buffer + _size, data, size);
// update the size
_size += size;
}
public:
/**
* Constructor
* @param connection
* @param handler
* @param frame
*/
PassthroughBuffer(Connection *connection, ConnectionHandler *handler, const Frame &frame) : _connection(connection), _handler(handler)
{
// tell the frame to fill this buffer
frame.fill(*this);
// append an end of frame byte (but not when still negotiating the protocol)
if (frame.needsSeparator()) add((uint8_t)206);
}
/**
* No copying, because that would be too expensive
* @param that
*/
PassthroughBuffer(const CopiedBuffer &that) = delete;
/**
* Moving is also not necessary
* @param that
*/
PassthroughBuffer(CopiedBuffer &&that) = delete;
/**
* Destructor
*/
virtual ~PassthroughBuffer()
{
// pass data to the handler
if (_size > 0) flush();
}
};
/**
* End of namespace
*/
}