From c82fce8ec9f48e5997326fd02bd582c680288fe3 Mon Sep 17 00:00:00 2001 From: Michael van der Werve Date: Wed, 19 Jun 2019 09:38:25 +0200 Subject: [PATCH] fix for incorrect handling of synchronous flag --- include/amqpcpp/channelimpl.h | 17 +++++++++++++++-- src/basiccancelokframe.h | 2 +- src/basicconsumeokframe.h | 2 +- src/basicgetemptyframe.h | 2 +- src/basicqosokframe.h | 2 +- src/channelcloseokframe.h | 2 +- src/channelflowokframe.h | 2 +- src/confirmselectokframe.h | 2 +- src/exchangebindokframe.h | 2 +- src/exchangedeclareokframe.h | 2 +- src/exchangedeleteokframe.h | 2 +- src/exchangeunbindokframe.h | 2 +- src/queuebindokframe.h | 2 +- src/queuedeclareokframe.h | 2 +- src/queuedeleteokframe.h | 2 +- src/queuepurgeokframe.h | 2 +- src/queueunbindokframe.h | 2 +- src/transactioncommitokframe.h | 2 +- src/transactionrollbackokframe.h | 2 +- src/transactionselectokframe.h | 2 +- 20 files changed, 34 insertions(+), 21 deletions(-) diff --git a/include/amqpcpp/channelimpl.h b/include/amqpcpp/channelimpl.h index 71a08a3..89b9390 100644 --- a/include/amqpcpp/channelimpl.h +++ b/include/amqpcpp/channelimpl.h @@ -645,8 +645,17 @@ public: // skip if there is no oldest callback if (!_oldestCallback) return true; - // the last (possibly synchronous) operation was received, so we're no longer in synchronous mode - if (_synchronous && _queue.empty()) _synchronous = false; + // remember whether or not we were synchronous + bool synchronous = _synchronous; + + // is the queue empty at this moment? + bool empty = _queue.empty(); + + // the last (possibly synchronous) operation was received, so we're no longer in synchronous mode. this + // is an optimization that makes sure that the first instruction _after_ a synchronous instruction + // that is installed during the success callback we make later does _not_ need to be buffered, but can be + // sent directly + if (synchronous && empty) _synchronous = false; // we are going to call callbacks that could destruct the channel Monitor monitor(this); @@ -661,6 +670,10 @@ public: // leap out if channel no longer exists if (!monitor.valid()) return false; + // if we were synchronous, but there were still messages in the queue, we process the queue now, because the synchronous + // operation was finished, and its callback was made, which means we're no longer in synchronous mode + if (synchronous && !empty) onSynchronized(); + // set the oldest callback _oldestCallback = next; diff --git a/src/basiccancelokframe.h b/src/basiccancelokframe.h index a894c5c..d6b31ea 100644 --- a/src/basiccancelokframe.h +++ b/src/basiccancelokframe.h @@ -94,7 +94,7 @@ public: if (!channel) return false; // report - if (channel->reportSuccess(consumerTag())) channel->onSynchronized(); + channel->reportSuccess(consumerTag()); // done return true; diff --git a/src/basicconsumeokframe.h b/src/basicconsumeokframe.h index 9ec6397..129419d 100644 --- a/src/basicconsumeokframe.h +++ b/src/basicconsumeokframe.h @@ -94,7 +94,7 @@ public: if (!channel) return false; // report - if (channel->reportSuccess(consumerTag())) channel->onSynchronized(); + channel->reportSuccess(consumerTag()); // done return true; diff --git a/src/basicgetemptyframe.h b/src/basicgetemptyframe.h index 656c2df..3efa975 100644 --- a/src/basicgetemptyframe.h +++ b/src/basicgetemptyframe.h @@ -84,7 +84,7 @@ public: if (!channel) return false; // report - if (channel->reportSuccess()) channel->onSynchronized(); + channel->reportSuccess(); // done return true; diff --git a/src/basicqosokframe.h b/src/basicqosokframe.h index 6698668..72aa90a 100644 --- a/src/basicqosokframe.h +++ b/src/basicqosokframe.h @@ -67,7 +67,7 @@ public: if (!channel) return false; // report - if (channel->reportSuccess()) channel->onSynchronized(); + channel->reportSuccess(); // done return true; diff --git a/src/channelcloseokframe.h b/src/channelcloseokframe.h index 1c8bd1e..9be59a9 100644 --- a/src/channelcloseokframe.h +++ b/src/channelcloseokframe.h @@ -72,7 +72,7 @@ public: if (!channel) return false; // report that the channel is closed - if (channel->reportClosed()) channel->onSynchronized(); + channel->reportClosed(); // done return true; diff --git a/src/channelflowokframe.h b/src/channelflowokframe.h index 2111253..d5c5aa0 100644 --- a/src/channelflowokframe.h +++ b/src/channelflowokframe.h @@ -93,7 +93,7 @@ public: if (!channel) return false; // report success for the call - if (channel->reportSuccess()) channel->onSynchronized(); + channel->reportSuccess(); // done return true; diff --git a/src/confirmselectokframe.h b/src/confirmselectokframe.h index 4a20992..c91c018 100644 --- a/src/confirmselectokframe.h +++ b/src/confirmselectokframe.h @@ -75,7 +75,7 @@ public: if(!channel) return false; // report that the channel is open - if (channel->reportSuccess()) channel->onSynchronized(); + channel->reportSuccess(); // done return true; diff --git a/src/exchangebindokframe.h b/src/exchangebindokframe.h index 6116eca..bbba17c 100644 --- a/src/exchangebindokframe.h +++ b/src/exchangebindokframe.h @@ -67,7 +67,7 @@ public: if(!channel) return false; // report to handler - if (channel->reportSuccess()) channel->onSynchronized(); + channel->reportSuccess(); // done return true; diff --git a/src/exchangedeclareokframe.h b/src/exchangedeclareokframe.h index 8334757..8d56030 100644 --- a/src/exchangedeclareokframe.h +++ b/src/exchangedeclareokframe.h @@ -70,7 +70,7 @@ public: if(!channel) return false; // report exchange declare ok - if (channel->reportSuccess()) channel->onSynchronized(); + channel->reportSuccess(); // done return true; diff --git a/src/exchangedeleteokframe.h b/src/exchangedeleteokframe.h index 7f6dba0..624cc84 100644 --- a/src/exchangedeleteokframe.h +++ b/src/exchangedeleteokframe.h @@ -71,7 +71,7 @@ public: if(!channel) return false; // report to handler - if (channel->reportSuccess()) channel->onSynchronized(); + channel->reportSuccess(); // done return true; diff --git a/src/exchangeunbindokframe.h b/src/exchangeunbindokframe.h index 511b8fe..3a30050 100644 --- a/src/exchangeunbindokframe.h +++ b/src/exchangeunbindokframe.h @@ -68,7 +68,7 @@ public: if(!channel) return false; // report to handler - if (channel->reportSuccess()) channel->onSynchronized(); + channel->reportSuccess(); // done return true; diff --git a/src/queuebindokframe.h b/src/queuebindokframe.h index b80b11b..e4e55e8 100644 --- a/src/queuebindokframe.h +++ b/src/queuebindokframe.h @@ -69,7 +69,7 @@ public: if(!channel) return false; // report to handler - if (channel->reportSuccess()) channel->onSynchronized(); + channel->reportSuccess(); // done return true; diff --git a/src/queuedeclareokframe.h b/src/queuedeclareokframe.h index 99ad1e8..ca545e9 100644 --- a/src/queuedeclareokframe.h +++ b/src/queuedeclareokframe.h @@ -133,7 +133,7 @@ public: if (!channel) return false; // report success - if (channel->reportSuccess(name(), messageCount(), consumerCount())) channel->onSynchronized(); + channel->reportSuccess(name(), messageCount(), consumerCount()); // done return true; diff --git a/src/queuedeleteokframe.h b/src/queuedeleteokframe.h index e8bc711..b7ece3f 100644 --- a/src/queuedeleteokframe.h +++ b/src/queuedeleteokframe.h @@ -94,7 +94,7 @@ public: if(!channel) return false; // report queue deletion success - if (channel->reportSuccess(this->messageCount())) channel->onSynchronized(); + channel->reportSuccess(this->messageCount()); // done return true; diff --git a/src/queuepurgeokframe.h b/src/queuepurgeokframe.h index 22cf119..9098a76 100644 --- a/src/queuepurgeokframe.h +++ b/src/queuepurgeokframe.h @@ -94,7 +94,7 @@ public: if(!channel) return false; // report queue purge success - if (channel->reportSuccess(this->messageCount())) channel->onSynchronized(); + channel->reportSuccess(this->messageCount()); // done return true; diff --git a/src/queueunbindokframe.h b/src/queueunbindokframe.h index 893908d..d57cff0 100644 --- a/src/queueunbindokframe.h +++ b/src/queueunbindokframe.h @@ -73,7 +73,7 @@ public: if(!channel) return false; // report queue unbind success - if (channel->reportSuccess()) channel->onSynchronized(); + channel->reportSuccess(); // done return true; diff --git a/src/transactioncommitokframe.h b/src/transactioncommitokframe.h index a02857d..0b4bbfb 100644 --- a/src/transactioncommitokframe.h +++ b/src/transactioncommitokframe.h @@ -74,7 +74,7 @@ public: if(!channel) return false; // report that the channel is open - if (channel->reportSuccess()) channel->onSynchronized(); + channel->reportSuccess(); // done return true; diff --git a/src/transactionrollbackokframe.h b/src/transactionrollbackokframe.h index d04532f..ecbb98b 100644 --- a/src/transactionrollbackokframe.h +++ b/src/transactionrollbackokframe.h @@ -74,7 +74,7 @@ public: if(!channel) return false; // report that the channel is open - if (channel->reportSuccess()) channel->onSynchronized(); + channel->reportSuccess(); // done return true; diff --git a/src/transactionselectokframe.h b/src/transactionselectokframe.h index f83e803..35737e4 100644 --- a/src/transactionselectokframe.h +++ b/src/transactionselectokframe.h @@ -74,7 +74,7 @@ public: if(!channel) return false; // report that the channel is open - if (channel->reportSuccess()) channel->onSynchronized(); + channel->reportSuccess(); // done return true;