fix(multiple-consume): only send consume message once
Previously if multiple attempts to call consume were made, then QAmqpQueue would send a frame for each of them leading to multiple consumer tags and general confusion. This guards from calling consume multiple times.
This commit is contained in:
parent
5c7eca5f07
commit
943180da27
|
|
@ -17,7 +17,8 @@ QAmqpQueuePrivate::QAmqpQueuePrivate(QAmqpQueue *q)
|
||||||
delayedDeclare(false),
|
delayedDeclare(false),
|
||||||
declared(false),
|
declared(false),
|
||||||
recievingMessage(false),
|
recievingMessage(false),
|
||||||
consuming(false)
|
consuming(false),
|
||||||
|
consumeRequested(false)
|
||||||
{
|
{
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
@ -218,6 +219,7 @@ void QAmqpQueuePrivate::consumeOk(const QAmqpMethodFrame &frame)
|
||||||
consumerTag = QAmqpFrame::readAmqpField(stream, QAmqpMetaType::ShortString).toString();
|
consumerTag = QAmqpFrame::readAmqpField(stream, QAmqpMetaType::ShortString).toString();
|
||||||
qAmqpDebug("consumer tag = %s", qPrintable(consumerTag));
|
qAmqpDebug("consumer tag = %s", qPrintable(consumerTag));
|
||||||
consuming = true;
|
consuming = true;
|
||||||
|
consumeRequested = false;
|
||||||
Q_EMIT q->consuming(consumerTag);
|
Q_EMIT q->consuming(consumerTag);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
@ -273,6 +275,8 @@ void QAmqpQueuePrivate::cancelOk(const QAmqpMethodFrame &frame)
|
||||||
}
|
}
|
||||||
|
|
||||||
consumerTag.clear();
|
consumerTag.clear();
|
||||||
|
consuming = false;
|
||||||
|
consumeRequested = false;
|
||||||
Q_EMIT q->cancelled(consumer);
|
Q_EMIT q->cancelled(consumer);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
@ -445,6 +449,11 @@ bool QAmqpQueue::consume(int options)
|
||||||
return false;
|
return false;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if (d->consumeRequested) {
|
||||||
|
qAmqpDebug() << Q_FUNC_INFO << "already attempting to consume";
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
|
||||||
if (d->consuming) {
|
if (d->consuming) {
|
||||||
qAmqpDebug() << Q_FUNC_INFO << "already consuming with tag: " << d->consumerTag;
|
qAmqpDebug() << Q_FUNC_INFO << "already consuming with tag: " << d->consumerTag;
|
||||||
return false;
|
return false;
|
||||||
|
|
@ -465,6 +474,7 @@ bool QAmqpQueue::consume(int options)
|
||||||
|
|
||||||
frame.setArguments(arguments);
|
frame.setArguments(arguments);
|
||||||
d->sendFrame(frame);
|
d->sendFrame(frame);
|
||||||
|
d->consumeRequested = true;
|
||||||
return true;
|
return true;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
||||||
|
|
@ -50,6 +50,7 @@ public:
|
||||||
bool recievingMessage;
|
bool recievingMessage;
|
||||||
QAmqpMessage currentMessage;
|
QAmqpMessage currentMessage;
|
||||||
bool consuming;
|
bool consuming;
|
||||||
|
bool consumeRequested;
|
||||||
|
|
||||||
Q_DECLARE_PUBLIC(QAmqpQueue)
|
Q_DECLARE_PUBLIC(QAmqpQueue)
|
||||||
|
|
||||||
|
|
|
||||||
|
|
@ -4,6 +4,7 @@
|
||||||
|
|
||||||
#include <QtTest/QtTest>
|
#include <QtTest/QtTest>
|
||||||
#include "qamqptestcase.h"
|
#include "qamqptestcase.h"
|
||||||
|
#include "signalspy.h"
|
||||||
|
|
||||||
#include "qamqpclient.h"
|
#include "qamqpclient.h"
|
||||||
#include "qamqpqueue.h"
|
#include "qamqpqueue.h"
|
||||||
|
|
@ -33,6 +34,7 @@ private Q_SLOTS:
|
||||||
void delayedBind();
|
void delayedBind();
|
||||||
void purge();
|
void purge();
|
||||||
void canOnlyStartConsumingOnce();
|
void canOnlyStartConsumingOnce();
|
||||||
|
void ensureConsumeOnlySentOnce();
|
||||||
void cancel();
|
void cancel();
|
||||||
void invalidCancelBecauseNotConsuming();
|
void invalidCancelBecauseNotConsuming();
|
||||||
void invalidCancelBecauseInvalidConsumerTag();
|
void invalidCancelBecauseInvalidConsumerTag();
|
||||||
|
|
@ -346,10 +348,29 @@ void tst_QAMQPQueue::purge()
|
||||||
void tst_QAMQPQueue::canOnlyStartConsumingOnce()
|
void tst_QAMQPQueue::canOnlyStartConsumingOnce()
|
||||||
{
|
{
|
||||||
QAmqpQueue *queue = client->createQueue("test-single-consumer");
|
QAmqpQueue *queue = client->createQueue("test-single-consumer");
|
||||||
|
QSignalSpy spy(queue, SIGNAL(consuming(QString)));
|
||||||
declareQueueAndVerifyConsuming(queue);
|
declareQueueAndVerifyConsuming(queue);
|
||||||
QCOMPARE(queue->consume(), false);
|
QCOMPARE(queue->consume(), false);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
void tst_QAMQPQueue::ensureConsumeOnlySentOnce()
|
||||||
|
{
|
||||||
|
QAmqpQueue *queue = client->createQueue("test-single-consumer");
|
||||||
|
SignalSpy spy(queue, SIGNAL(consuming(QString)));
|
||||||
|
queue->declare();
|
||||||
|
QVERIFY(waitForSignal(queue, SIGNAL(declared())));
|
||||||
|
|
||||||
|
// try to consume twice
|
||||||
|
QVERIFY(queue->consume());
|
||||||
|
QCOMPARE(queue->consume(), false);
|
||||||
|
QVERIFY(spy.wait());
|
||||||
|
QCOMPARE(spy.size(), 1);
|
||||||
|
|
||||||
|
// clean up queue
|
||||||
|
queue->remove(QAmqpQueue::roForce);
|
||||||
|
QVERIFY(waitForSignal(queue, SIGNAL(removed())));
|
||||||
|
}
|
||||||
|
|
||||||
void tst_QAMQPQueue::cancel()
|
void tst_QAMQPQueue::cancel()
|
||||||
{
|
{
|
||||||
QAmqpQueue *queue = client->createQueue("test-cancel");
|
QAmqpQueue *queue = client->createQueue("test-cancel");
|
||||||
|
|
|
||||||
Loading…
Reference in New Issue