add support for cancelling a consumer

cancel method and cancelled signal added to Queue, added auto tests
to guarantee conditions surrounding the cancel process
This commit is contained in:
Matt Broadstone 2014-06-24 11:09:19 -04:00
parent c43c2d6788
commit cab23c97cc
4 changed files with 105 additions and 5 deletions

View File

@ -66,6 +66,9 @@ bool QueuePrivate::_q_method(const Frame::Method &frame)
case bmGetEmpty:
Q_EMIT q->empty();
break;
case bmCancelOk:
cancelOk(frame);
break;
}
return true;
@ -240,6 +243,21 @@ void QueuePrivate::declare()
delayedDeclare = false;
}
void QueuePrivate::cancelOk(const Frame::Method &frame)
{
Q_Q(Queue);
qAmqpDebug() << Q_FUNC_INFO;
QByteArray data = frame.arguments();
QDataStream in(&data, QIODevice::ReadOnly);
QString consumer = Frame::readField('s',in).toString();
if (consumerTag != consumer) {
qAmqpDebug() << Q_FUNC_INFO << "invalid consumer tag: " << consumer;
return;
}
consumerTag.clear();
Q_EMIT q->cancelled(consumer);
}
//////////////////////////////////////////////////////////////////////////
@ -506,3 +524,30 @@ void Queue::ack(const Message &message)
frame.setArguments(arguments);
d->sendFrame(frame);
}
bool Queue::cancel(bool noWait)
{
Q_D(Queue);
if (!d->consuming) {
qAmqpDebug() << Q_FUNC_INFO << "not consuming!";
return false;
}
if (d->consumerTag.isEmpty()) {
qAmqpDebug() << Q_FUNC_INFO << "consuming with an empty consumer tag, failing...";
return false;
}
Frame::Method frame(Frame::fcBasic, QueuePrivate::bmCancel);
frame.setChannel(d->channelNumber);
QByteArray arguments;
QDataStream out(&arguments, QIODevice::WriteOnly);
Frame::writeField('s', out, d->consumerTag);
out << (noWait ? qint8(0x01) : qint8(0x0));
frame.setArguments(arguments);
d->sendFrame(frame);
return true;
}

View File

@ -76,6 +76,7 @@ public:
bool consume(int options = NoOptions);
void get();
void ack(const Message &message);
bool cancel(bool noWait = false);
Q_SIGNALS:
void declared();
@ -86,6 +87,7 @@ Q_SIGNALS:
void empty();
void purged(int messageCount);
void consuming(const QString &consumerTag);
void cancelled(const QString &consumerTag);
protected:
// reimp Channel

View File

@ -26,19 +26,22 @@ public:
~QueuePrivate();
void declare();
// method handler related
virtual bool _q_method(const Frame::Method &frame);
virtual void _q_content(const Frame::Content &frame);
virtual void _q_body(const Frame::ContentBody &frame);
// AMQP Queue method handlers
void declareOk(const Frame::Method &frame);
void deleteOk(const Frame::Method &frame);
void purgeOk(const Frame::Method &frame);
void bindOk(const Frame::Method &frame);
void unbindOk(const Frame::Method &frame);
void getOk(const Frame::Method &frame);
void consumeOk(const Frame::Method &frame);
// AMQP Basic method handlers
virtual void _q_content(const Frame::Content &frame);
virtual void _q_body(const Frame::ContentBody &frame);
void deliver(const Frame::Method &frame);
void getOk(const Frame::Method &frame);
void cancelOk(const Frame::Method &frame);
QString type;
int options;

View File

@ -31,6 +31,9 @@ private Q_SLOTS:
void unbind();
void purge();
void canOnlyStartConsumingOnce();
void cancel();
void invalidCancelBecauseNotConsuming();
void invalidCancelBecauseInvalidConsumerTag();
private:
QScopedPointer<Client> client;
@ -310,10 +313,57 @@ void tst_QAMQPQueue::canOnlyStartConsumingOnce()
QSignalSpy spy(queue, SIGNAL(consuming(QString)));
QVERIFY(waitForSignal(queue, SIGNAL(consuming(QString))));
QVERIFY(queue->isConsuming());
QVERIFY(!spy.isEmpty());
QList<QVariant> arguments = spy.takeFirst();
QCOMPARE(arguments.at(0).toString(), queue->consumerTag());
QCOMPARE(queue->consume(), false);
}
void tst_QAMQPQueue::cancel()
{
Queue *queue = client->createQueue("test-cancel");
queue->declare();
QVERIFY(waitForSignal(queue, SIGNAL(declared())));
QVERIFY(queue->consume());
QSignalSpy spy(queue, SIGNAL(consuming(QString)));
QVERIFY(waitForSignal(queue, SIGNAL(consuming(QString))));
QVERIFY(queue->isConsuming());
QVERIFY(!spy.isEmpty());
QList<QVariant> arguments = spy.takeFirst();
QCOMPARE(arguments.at(0).toString(), queue->consumerTag());
QString consumerTag = queue->consumerTag();
QSignalSpy cancelSpy(queue, SIGNAL(cancelled(QString)));
QVERIFY(queue->cancel());
QVERIFY(waitForSignal(queue, SIGNAL(cancelled(QString))));
QVERIFY(!cancelSpy.isEmpty());
arguments = cancelSpy.takeFirst();
QCOMPARE(arguments.at(0).toString(), consumerTag);
}
void tst_QAMQPQueue::invalidCancelBecauseNotConsuming()
{
Queue *queue = client->createQueue("test-invalid-cancel-because-not-consuming");
queue->declare();
QVERIFY(waitForSignal(queue, SIGNAL(declared())));
QCOMPARE(queue->cancel(), false);
}
void tst_QAMQPQueue::invalidCancelBecauseInvalidConsumerTag()
{
Queue *queue = client->createQueue("test-invalid-cancel-because-invalid-consumer-tag");
queue->declare();
QVERIFY(waitForSignal(queue, SIGNAL(declared())));
QVERIFY(queue->consume());
QSignalSpy spy(queue, SIGNAL(consuming(QString)));
QVERIFY(waitForSignal(queue, SIGNAL(consuming(QString))));
QVERIFY(queue->isConsuming());
QVERIFY(!spy.isEmpty());
QList<QVariant> arguments = spy.takeFirst();
QCOMPARE(arguments.at(0).toString(), queue->consumerTag());
queue->setConsumerTag(QString());
QCOMPARE(queue->cancel(), false);
}
QTEST_MAIN(tst_QAMQPQueue)
#include "tst_qamqpqueue.moc"