diff --git a/src/amqp_queue.cpp b/src/amqp_queue.cpp index aff3e51..f5d4040 100644 --- a/src/amqp_queue.cpp +++ b/src/amqp_queue.cpp @@ -66,6 +66,9 @@ bool QueuePrivate::_q_method(const Frame::Method &frame) case bmGetEmpty: Q_EMIT q->empty(); break; + case bmCancelOk: + cancelOk(frame); + break; } return true; @@ -240,6 +243,21 @@ void QueuePrivate::declare() delayedDeclare = false; } +void QueuePrivate::cancelOk(const Frame::Method &frame) +{ + Q_Q(Queue); + qAmqpDebug() << Q_FUNC_INFO; + QByteArray data = frame.arguments(); + QDataStream in(&data, QIODevice::ReadOnly); + QString consumer = Frame::readField('s',in).toString(); + if (consumerTag != consumer) { + qAmqpDebug() << Q_FUNC_INFO << "invalid consumer tag: " << consumer; + return; + } + + consumerTag.clear(); + Q_EMIT q->cancelled(consumer); +} ////////////////////////////////////////////////////////////////////////// @@ -506,3 +524,30 @@ void Queue::ack(const Message &message) frame.setArguments(arguments); d->sendFrame(frame); } + +bool Queue::cancel(bool noWait) +{ + Q_D(Queue); + if (!d->consuming) { + qAmqpDebug() << Q_FUNC_INFO << "not consuming!"; + return false; + } + + if (d->consumerTag.isEmpty()) { + qAmqpDebug() << Q_FUNC_INFO << "consuming with an empty consumer tag, failing..."; + return false; + } + + Frame::Method frame(Frame::fcBasic, QueuePrivate::bmCancel); + frame.setChannel(d->channelNumber); + + QByteArray arguments; + QDataStream out(&arguments, QIODevice::WriteOnly); + + Frame::writeField('s', out, d->consumerTag); + out << (noWait ? qint8(0x01) : qint8(0x0)); + + frame.setArguments(arguments); + d->sendFrame(frame); + return true; +} diff --git a/src/amqp_queue.h b/src/amqp_queue.h index 0041f0c..4bdab71 100644 --- a/src/amqp_queue.h +++ b/src/amqp_queue.h @@ -76,6 +76,7 @@ public: bool consume(int options = NoOptions); void get(); void ack(const Message &message); + bool cancel(bool noWait = false); Q_SIGNALS: void declared(); @@ -86,6 +87,7 @@ Q_SIGNALS: void empty(); void purged(int messageCount); void consuming(const QString &consumerTag); + void cancelled(const QString &consumerTag); protected: // reimp Channel diff --git a/src/amqp_queue_p.h b/src/amqp_queue_p.h index dc675ed..961279b 100644 --- a/src/amqp_queue_p.h +++ b/src/amqp_queue_p.h @@ -26,19 +26,22 @@ public: ~QueuePrivate(); void declare(); - - // method handler related virtual bool _q_method(const Frame::Method &frame); - virtual void _q_content(const Frame::Content &frame); - virtual void _q_body(const Frame::ContentBody &frame); + + // AMQP Queue method handlers void declareOk(const Frame::Method &frame); void deleteOk(const Frame::Method &frame); void purgeOk(const Frame::Method &frame); void bindOk(const Frame::Method &frame); void unbindOk(const Frame::Method &frame); - void getOk(const Frame::Method &frame); void consumeOk(const Frame::Method &frame); + + // AMQP Basic method handlers + virtual void _q_content(const Frame::Content &frame); + virtual void _q_body(const Frame::ContentBody &frame); void deliver(const Frame::Method &frame); + void getOk(const Frame::Method &frame); + void cancelOk(const Frame::Method &frame); QString type; int options; diff --git a/tests/auto/qamqpqueue/tst_qamqpqueue.cpp b/tests/auto/qamqpqueue/tst_qamqpqueue.cpp index 8149ae8..7accb3e 100644 --- a/tests/auto/qamqpqueue/tst_qamqpqueue.cpp +++ b/tests/auto/qamqpqueue/tst_qamqpqueue.cpp @@ -31,6 +31,9 @@ private Q_SLOTS: void unbind(); void purge(); void canOnlyStartConsumingOnce(); + void cancel(); + void invalidCancelBecauseNotConsuming(); + void invalidCancelBecauseInvalidConsumerTag(); private: QScopedPointer client; @@ -310,10 +313,57 @@ void tst_QAMQPQueue::canOnlyStartConsumingOnce() QSignalSpy spy(queue, SIGNAL(consuming(QString))); QVERIFY(waitForSignal(queue, SIGNAL(consuming(QString)))); QVERIFY(queue->isConsuming()); + QVERIFY(!spy.isEmpty()); QList arguments = spy.takeFirst(); QCOMPARE(arguments.at(0).toString(), queue->consumerTag()); QCOMPARE(queue->consume(), false); } +void tst_QAMQPQueue::cancel() +{ + Queue *queue = client->createQueue("test-cancel"); + queue->declare(); + QVERIFY(waitForSignal(queue, SIGNAL(declared()))); + QVERIFY(queue->consume()); + QSignalSpy spy(queue, SIGNAL(consuming(QString))); + QVERIFY(waitForSignal(queue, SIGNAL(consuming(QString)))); + QVERIFY(queue->isConsuming()); + QVERIFY(!spy.isEmpty()); + QList arguments = spy.takeFirst(); + QCOMPARE(arguments.at(0).toString(), queue->consumerTag()); + + QString consumerTag = queue->consumerTag(); + QSignalSpy cancelSpy(queue, SIGNAL(cancelled(QString))); + QVERIFY(queue->cancel()); + QVERIFY(waitForSignal(queue, SIGNAL(cancelled(QString)))); + QVERIFY(!cancelSpy.isEmpty()); + arguments = cancelSpy.takeFirst(); + QCOMPARE(arguments.at(0).toString(), consumerTag); +} + +void tst_QAMQPQueue::invalidCancelBecauseNotConsuming() +{ + Queue *queue = client->createQueue("test-invalid-cancel-because-not-consuming"); + queue->declare(); + QVERIFY(waitForSignal(queue, SIGNAL(declared()))); + QCOMPARE(queue->cancel(), false); +} + +void tst_QAMQPQueue::invalidCancelBecauseInvalidConsumerTag() +{ + Queue *queue = client->createQueue("test-invalid-cancel-because-invalid-consumer-tag"); + queue->declare(); + QVERIFY(waitForSignal(queue, SIGNAL(declared()))); + QVERIFY(queue->consume()); + QSignalSpy spy(queue, SIGNAL(consuming(QString))); + QVERIFY(waitForSignal(queue, SIGNAL(consuming(QString)))); + QVERIFY(queue->isConsuming()); + QVERIFY(!spy.isEmpty()); + QList arguments = spy.takeFirst(); + QCOMPARE(arguments.at(0).toString(), queue->consumerTag()); + queue->setConsumerTag(QString()); + QCOMPARE(queue->cancel(), false); +} + QTEST_MAIN(tst_QAMQPQueue) #include "tst_qamqpqueue.moc"