Fixup sync handling for CopiedBuffer

This commit is contained in:
Raoul Wols 2021-08-09 16:55:03 +02:00
parent a09e33a609
commit 1d25f0acbc
No known key found for this signature in database
GPG Key ID: 9FFE06A0F6AAA2DF
3 changed files with 49 additions and 21 deletions

View File

@ -133,7 +133,7 @@ private:
* *
* @var std::queue * @var std::queue
*/ */
std::queue<std::pair<bool, CopiedBuffer>> _queue; std::queue<CopiedBuffer> _queue;
/** /**
* Are we currently operating in synchronous mode? Meaning: do we first have * Are we currently operating in synchronous mode? Meaning: do we first have
@ -609,8 +609,9 @@ public:
/** /**
* Signal the channel that a synchronous operation was completed, and that any * Signal the channel that a synchronous operation was completed, and that any
* queued frames can be sent out. * queued frames can be sent out.
* @return false if an error on the connection level occurred, true if not
*/ */
void flush(); bool flush();
/** /**
* Report to the handler that the channel is opened * Report to the handler that the channel is opened

View File

@ -49,6 +49,12 @@ private:
*/ */
size_t _size = 0; size_t _size = 0;
/**
* Whether the frame is synchronous
* @var bool
*/
bool _synchronous = false;
protected: protected:
/** /**
@ -72,7 +78,8 @@ public:
*/ */
CopiedBuffer(const Frame &frame) : CopiedBuffer(const Frame &frame) :
_capacity(frame.totalSize()), _capacity(frame.totalSize()),
_buffer((char *)malloc(_capacity)) _buffer((char *)malloc(_capacity)),
_synchronous(frame.synchronous())
{ {
// tell the frame to fill this buffer // tell the frame to fill this buffer
frame.fill(*this); frame.fill(*this);
@ -94,7 +101,8 @@ public:
CopiedBuffer(CopiedBuffer &&that) : CopiedBuffer(CopiedBuffer &&that) :
_capacity(that._capacity), _capacity(that._capacity),
_buffer(that._buffer), _buffer(that._buffer),
_size(that._size) _size(that._size),
_synchronous(that._synchronous)
{ {
// reset the other object // reset the other object
that._buffer = nullptr; that._buffer = nullptr;
@ -130,6 +138,16 @@ public:
// expose member // expose member
return _size; return _size;
} }
/**
* Whether the frame is to be sent synchronously
* @return bool
*/
bool synchronous() const noexcept
{
// expose member
return _synchronous;
}
}; };
/** /**

View File

@ -750,18 +750,21 @@ bool ChannelImpl::send(CopiedBuffer &&frame)
{ {
// we need to wait until the synchronous frame has // we need to wait until the synchronous frame has
// been processed, so queue the frame until it was // been processed, so queue the frame until it was
_queue.emplace(false, std::move(frame)); _queue.emplace(std::move(frame));
// it was of course not actually sent but we pretend // it was of course not actually sent but we pretend
// that it was, because no error occured // that it was, because no error occured
return true; return true;
} }
// is this a synchronous frame?
bool syncframe = frame.synchronous();
// send to tcp connection // send to tcp connection
if (!_connection->send(std::move(frame))) return false; if (!_connection->send(std::move(frame))) return false;
// frame was sent, a copied buffer cannot be synchronous // frame was sent, if this was a synchronous frame, we now have to wait
_synchronous = false; _synchronous = syncframe;
// done // done
return true; return true;
@ -789,7 +792,7 @@ bool ChannelImpl::send(const Frame &frame)
{ {
// we need to wait until the synchronous frame has // we need to wait until the synchronous frame has
// been processed, so queue the frame until it was // been processed, so queue the frame until it was
_queue.emplace(frame.synchronous(), frame); _queue.emplace(frame);
// it was of course not actually sent but we pretend // it was of course not actually sent but we pretend
// that it was, because no error occured // that it was, because no error occured
@ -821,7 +824,7 @@ uint32_t ChannelImpl::maxPayload() const
* Signal the channel that a synchronous operation was completed. After * Signal the channel that a synchronous operation was completed. After
* this operation, waiting frames can be sent out. * this operation, waiting frames can be sent out.
*/ */
void ChannelImpl::flush() bool ChannelImpl::flush()
{ {
// we are no longer waiting for synchronous operations // we are no longer waiting for synchronous operations
_synchronous = false; _synchronous = false;
@ -832,21 +835,27 @@ void ChannelImpl::flush()
// send all frames while not in synchronous mode // send all frames while not in synchronous mode
while (_connection && !_synchronous && !_queue.empty()) while (_connection && !_synchronous && !_queue.empty())
{ {
// retrieve the first buffer and synchronous // retrieve the front item
auto &pair = _queue.front(); auto buffer = std::move(_queue.front());
// mark as synchronous if necessary
_synchronous = pair.first;
// send it over the connection
_connection->send(std::move(pair.second));
// the user space handler may have destructed this channel object
if (!monitor.valid()) return;
// remove from the list // remove from the list
_queue.pop(); _queue.pop();
// is this a synchronous frame?
bool syncframe = buffer.synchronous();
// send to tcp connection
if (!_connection->send(std::move(buffer))) return false;
// the user space handler may have destructed this channel object
if (!monitor.valid()) return true;
// frame was sent, if this was a synchronous frame, we now have to wait
_synchronous = syncframe;
} }
// done
return true;
} }
/** /**