From 1d25f0acbce7404d69b9a2bbdc24d74f1f783c04 Mon Sep 17 00:00:00 2001 From: Raoul Wols Date: Mon, 9 Aug 2021 16:55:03 +0200 Subject: [PATCH] Fixup sync handling for CopiedBuffer --- include/amqpcpp/channelimpl.h | 5 ++-- include/amqpcpp/copiedbuffer.h | 22 +++++++++++++++-- src/channelimpl.cpp | 43 ++++++++++++++++++++-------------- 3 files changed, 49 insertions(+), 21 deletions(-) diff --git a/include/amqpcpp/channelimpl.h b/include/amqpcpp/channelimpl.h index aacec9f..73bedc7 100644 --- a/include/amqpcpp/channelimpl.h +++ b/include/amqpcpp/channelimpl.h @@ -133,7 +133,7 @@ private: * * @var std::queue */ - std::queue> _queue; + std::queue _queue; /** * Are we currently operating in synchronous mode? Meaning: do we first have @@ -609,8 +609,9 @@ public: /** * Signal the channel that a synchronous operation was completed, and that any * queued frames can be sent out. + * @return false if an error on the connection level occurred, true if not */ - void flush(); + bool flush(); /** * Report to the handler that the channel is opened diff --git a/include/amqpcpp/copiedbuffer.h b/include/amqpcpp/copiedbuffer.h index 0716483..3bdd742 100644 --- a/include/amqpcpp/copiedbuffer.h +++ b/include/amqpcpp/copiedbuffer.h @@ -49,6 +49,12 @@ private: */ size_t _size = 0; + /** + * Whether the frame is synchronous + * @var bool + */ + bool _synchronous = false; + protected: /** @@ -72,7 +78,8 @@ public: */ CopiedBuffer(const Frame &frame) : _capacity(frame.totalSize()), - _buffer((char *)malloc(_capacity)) + _buffer((char *)malloc(_capacity)), + _synchronous(frame.synchronous()) { // tell the frame to fill this buffer frame.fill(*this); @@ -94,7 +101,8 @@ public: CopiedBuffer(CopiedBuffer &&that) : _capacity(that._capacity), _buffer(that._buffer), - _size(that._size) + _size(that._size), + _synchronous(that._synchronous) { // reset the other object that._buffer = nullptr; @@ -130,6 +138,16 @@ public: // expose member return _size; } + + /** + * Whether the frame is to be sent synchronously + * @return bool + */ + bool synchronous() const noexcept + { + // expose member + return _synchronous; + } }; /** diff --git a/src/channelimpl.cpp b/src/channelimpl.cpp index 56b2d1a..79efeb4 100644 --- a/src/channelimpl.cpp +++ b/src/channelimpl.cpp @@ -750,18 +750,21 @@ bool ChannelImpl::send(CopiedBuffer &&frame) { // we need to wait until the synchronous frame has // been processed, so queue the frame until it was - _queue.emplace(false, std::move(frame)); + _queue.emplace(std::move(frame)); // it was of course not actually sent but we pretend // that it was, because no error occured return true; } + // is this a synchronous frame? + bool syncframe = frame.synchronous(); + // send to tcp connection if (!_connection->send(std::move(frame))) return false; - // frame was sent, a copied buffer cannot be synchronous - _synchronous = false; + // frame was sent, if this was a synchronous frame, we now have to wait + _synchronous = syncframe; // done return true; @@ -789,7 +792,7 @@ bool ChannelImpl::send(const Frame &frame) { // we need to wait until the synchronous frame has // been processed, so queue the frame until it was - _queue.emplace(frame.synchronous(), frame); + _queue.emplace(frame); // it was of course not actually sent but we pretend // that it was, because no error occured @@ -798,7 +801,7 @@ bool ChannelImpl::send(const Frame &frame) // send to tcp connection if (!_connection->send(frame)) return false; - + // frame was sent, if this was a synchronous frame, we now have to wait _synchronous = frame.synchronous(); @@ -821,7 +824,7 @@ uint32_t ChannelImpl::maxPayload() const * Signal the channel that a synchronous operation was completed. After * this operation, waiting frames can be sent out. */ -void ChannelImpl::flush() +bool ChannelImpl::flush() { // we are no longer waiting for synchronous operations _synchronous = false; @@ -832,21 +835,27 @@ void ChannelImpl::flush() // send all frames while not in synchronous mode while (_connection && !_synchronous && !_queue.empty()) { - // retrieve the first buffer and synchronous - auto &pair = _queue.front(); - - // mark as synchronous if necessary - _synchronous = pair.first; - - // send it over the connection - _connection->send(std::move(pair.second)); - - // the user space handler may have destructed this channel object - if (!monitor.valid()) return; + // retrieve the front item + auto buffer = std::move(_queue.front()); // remove from the list _queue.pop(); + + // is this a synchronous frame? + bool syncframe = buffer.synchronous(); + + // send to tcp connection + if (!_connection->send(std::move(buffer))) return false; + + // the user space handler may have destructed this channel object + if (!monitor.valid()) return true; + + // frame was sent, if this was a synchronous frame, we now have to wait + _synchronous = syncframe; } + + // done + return true; } /**