fix for incorrect handling of synchronous flag

This commit is contained in:
Michael van der Werve 2019-06-19 09:38:25 +02:00
parent 19d82ed1f2
commit c82fce8ec9
20 changed files with 34 additions and 21 deletions

View File

@ -645,8 +645,17 @@ public:
// skip if there is no oldest callback
if (!_oldestCallback) return true;
// the last (possibly synchronous) operation was received, so we're no longer in synchronous mode
if (_synchronous && _queue.empty()) _synchronous = false;
// remember whether or not we were synchronous
bool synchronous = _synchronous;
// is the queue empty at this moment?
bool empty = _queue.empty();
// the last (possibly synchronous) operation was received, so we're no longer in synchronous mode. this
// is an optimization that makes sure that the first instruction _after_ a synchronous instruction
// that is installed during the success callback we make later does _not_ need to be buffered, but can be
// sent directly
if (synchronous && empty) _synchronous = false;
// we are going to call callbacks that could destruct the channel
Monitor monitor(this);
@ -661,6 +670,10 @@ public:
// leap out if channel no longer exists
if (!monitor.valid()) return false;
// if we were synchronous, but there were still messages in the queue, we process the queue now, because the synchronous
// operation was finished, and its callback was made, which means we're no longer in synchronous mode
if (synchronous && !empty) onSynchronized();
// set the oldest callback
_oldestCallback = next;

View File

@ -94,7 +94,7 @@ public:
if (!channel) return false;
// report
if (channel->reportSuccess<const std::string&>(consumerTag())) channel->onSynchronized();
channel->reportSuccess<const std::string&>(consumerTag());
// done
return true;

View File

@ -94,7 +94,7 @@ public:
if (!channel) return false;
// report
if (channel->reportSuccess(consumerTag())) channel->onSynchronized();
channel->reportSuccess(consumerTag());
// done
return true;

View File

@ -84,7 +84,7 @@ public:
if (!channel) return false;
// report
if (channel->reportSuccess()) channel->onSynchronized();
channel->reportSuccess();
// done
return true;

View File

@ -67,7 +67,7 @@ public:
if (!channel) return false;
// report
if (channel->reportSuccess()) channel->onSynchronized();
channel->reportSuccess();
// done
return true;

View File

@ -72,7 +72,7 @@ public:
if (!channel) return false;
// report that the channel is closed
if (channel->reportClosed()) channel->onSynchronized();
channel->reportClosed();
// done
return true;

View File

@ -93,7 +93,7 @@ public:
if (!channel) return false;
// report success for the call
if (channel->reportSuccess()) channel->onSynchronized();
channel->reportSuccess();
// done
return true;

View File

@ -75,7 +75,7 @@ public:
if(!channel) return false;
// report that the channel is open
if (channel->reportSuccess()) channel->onSynchronized();
channel->reportSuccess();
// done
return true;

View File

@ -67,7 +67,7 @@ public:
if(!channel) return false;
// report to handler
if (channel->reportSuccess()) channel->onSynchronized();
channel->reportSuccess();
// done
return true;

View File

@ -70,7 +70,7 @@ public:
if(!channel) return false;
// report exchange declare ok
if (channel->reportSuccess()) channel->onSynchronized();
channel->reportSuccess();
// done
return true;

View File

@ -71,7 +71,7 @@ public:
if(!channel) return false;
// report to handler
if (channel->reportSuccess()) channel->onSynchronized();
channel->reportSuccess();
// done
return true;

View File

@ -68,7 +68,7 @@ public:
if(!channel) return false;
// report to handler
if (channel->reportSuccess()) channel->onSynchronized();
channel->reportSuccess();
// done
return true;

View File

@ -69,7 +69,7 @@ public:
if(!channel) return false;
// report to handler
if (channel->reportSuccess()) channel->onSynchronized();
channel->reportSuccess();
// done
return true;

View File

@ -133,7 +133,7 @@ public:
if (!channel) return false;
// report success
if (channel->reportSuccess(name(), messageCount(), consumerCount())) channel->onSynchronized();
channel->reportSuccess(name(), messageCount(), consumerCount());
// done
return true;

View File

@ -94,7 +94,7 @@ public:
if(!channel) return false;
// report queue deletion success
if (channel->reportSuccess(this->messageCount())) channel->onSynchronized();
channel->reportSuccess(this->messageCount());
// done
return true;

View File

@ -94,7 +94,7 @@ public:
if(!channel) return false;
// report queue purge success
if (channel->reportSuccess(this->messageCount())) channel->onSynchronized();
channel->reportSuccess(this->messageCount());
// done
return true;

View File

@ -73,7 +73,7 @@ public:
if(!channel) return false;
// report queue unbind success
if (channel->reportSuccess()) channel->onSynchronized();
channel->reportSuccess();
// done
return true;

View File

@ -74,7 +74,7 @@ public:
if(!channel) return false;
// report that the channel is open
if (channel->reportSuccess()) channel->onSynchronized();
channel->reportSuccess();
// done
return true;

View File

@ -74,7 +74,7 @@ public:
if(!channel) return false;
// report that the channel is open
if (channel->reportSuccess()) channel->onSynchronized();
channel->reportSuccess();
// done
return true;

View File

@ -74,7 +74,7 @@ public:
if(!channel) return false;
// report that the channel is open
if (channel->reportSuccess()) channel->onSynchronized();
channel->reportSuccess();
// done
return true;