diff --git a/src/qamqpqueue.cpp b/src/qamqpqueue.cpp index f429027..0eadc41 100644 --- a/src/qamqpqueue.cpp +++ b/src/qamqpqueue.cpp @@ -17,7 +17,8 @@ QAmqpQueuePrivate::QAmqpQueuePrivate(QAmqpQueue *q) delayedDeclare(false), declared(false), recievingMessage(false), - consuming(false) + consuming(false), + consumeRequested(false) { } @@ -218,6 +219,7 @@ void QAmqpQueuePrivate::consumeOk(const QAmqpMethodFrame &frame) consumerTag = QAmqpFrame::readAmqpField(stream, QAmqpMetaType::ShortString).toString(); qAmqpDebug("consumer tag = %s", qPrintable(consumerTag)); consuming = true; + consumeRequested = false; Q_EMIT q->consuming(consumerTag); } @@ -273,6 +275,8 @@ void QAmqpQueuePrivate::cancelOk(const QAmqpMethodFrame &frame) } consumerTag.clear(); + consuming = false; + consumeRequested = false; Q_EMIT q->cancelled(consumer); } @@ -445,6 +449,11 @@ bool QAmqpQueue::consume(int options) return false; } + if (d->consumeRequested) { + qAmqpDebug() << Q_FUNC_INFO << "already attempting to consume"; + return false; + } + if (d->consuming) { qAmqpDebug() << Q_FUNC_INFO << "already consuming with tag: " << d->consumerTag; return false; @@ -465,6 +474,7 @@ bool QAmqpQueue::consume(int options) frame.setArguments(arguments); d->sendFrame(frame); + d->consumeRequested = true; return true; } diff --git a/src/qamqpqueue_p.h b/src/qamqpqueue_p.h index 8a017cc..eceffed 100644 --- a/src/qamqpqueue_p.h +++ b/src/qamqpqueue_p.h @@ -50,6 +50,7 @@ public: bool recievingMessage; QAmqpMessage currentMessage; bool consuming; + bool consumeRequested; Q_DECLARE_PUBLIC(QAmqpQueue) diff --git a/tests/auto/qamqpqueue/tst_qamqpqueue.cpp b/tests/auto/qamqpqueue/tst_qamqpqueue.cpp index f4ee397..428b9f6 100644 --- a/tests/auto/qamqpqueue/tst_qamqpqueue.cpp +++ b/tests/auto/qamqpqueue/tst_qamqpqueue.cpp @@ -4,6 +4,7 @@ #include #include "qamqptestcase.h" +#include "signalspy.h" #include "qamqpclient.h" #include "qamqpqueue.h" @@ -33,6 +34,7 @@ private Q_SLOTS: void delayedBind(); void purge(); void canOnlyStartConsumingOnce(); + void ensureConsumeOnlySentOnce(); void cancel(); void invalidCancelBecauseNotConsuming(); void invalidCancelBecauseInvalidConsumerTag(); @@ -346,10 +348,29 @@ void tst_QAMQPQueue::purge() void tst_QAMQPQueue::canOnlyStartConsumingOnce() { QAmqpQueue *queue = client->createQueue("test-single-consumer"); + QSignalSpy spy(queue, SIGNAL(consuming(QString))); declareQueueAndVerifyConsuming(queue); QCOMPARE(queue->consume(), false); } +void tst_QAMQPQueue::ensureConsumeOnlySentOnce() +{ + QAmqpQueue *queue = client->createQueue("test-single-consumer"); + SignalSpy spy(queue, SIGNAL(consuming(QString))); + queue->declare(); + QVERIFY(waitForSignal(queue, SIGNAL(declared()))); + + // try to consume twice + QVERIFY(queue->consume()); + QCOMPARE(queue->consume(), false); + QVERIFY(spy.wait()); + QCOMPARE(spy.size(), 1); + + // clean up queue + queue->remove(QAmqpQueue::roForce); + QVERIFY(waitForSignal(queue, SIGNAL(removed()))); +} + void tst_QAMQPQueue::cancel() { QAmqpQueue *queue = client->createQueue("test-cancel");