From 12a03f959f275a6224d6bec02d77132987d1f6c5 Mon Sep 17 00:00:00 2001 From: Matt Broadstone Date: Thu, 26 Jun 2014 10:28:58 -0400 Subject: [PATCH] add qos support to channel AMQP Basic qos support added to Channel class. There is an option for global qos which is currently not added. Also, added auto tests for qos definition, proper qos operation and verifying that RabbitMQ does not support prefetchSize. --- src/amqp_channel.cpp | 69 +++++++++++++++++++----- src/amqp_channel.h | 8 ++- src/amqp_channel_p.h | 7 ++- tests/auto/qamqpqueue/tst_qamqpqueue.cpp | 68 +++++++++++++++++++++++ 4 files changed, 138 insertions(+), 14 deletions(-) diff --git a/src/amqp_channel.cpp b/src/amqp_channel.cpp index 12e2d64..675a7aa 100644 --- a/src/amqp_channel.cpp +++ b/src/amqp_channel.cpp @@ -13,6 +13,10 @@ ChannelPrivate::ChannelPrivate(Channel *q) : channelNumber(0), opened(false), needOpen(true), + prefetchSize(0), + requestedPrefetchSize(0), + prefetchCount(0), + requestedPrefetchCount(0), error(QAMQP::NoError), q_ptr(q) { @@ -55,6 +59,15 @@ bool ChannelPrivate::_q_method(const Frame::Method &frame) if (frame.channel() != channelNumber) return true; + if (frame.methodClass() == Frame::fcBasic) { + if (frame.id() == bmQosOk) { + qosOk(frame); + return true; + } + + return false; + } + if (frame.methodClass() != Frame::fcChannel) return false; @@ -88,8 +101,12 @@ void ChannelPrivate::_q_open() void ChannelPrivate::sendFrame(const Frame::Base &frame) { - if (client) - client->d_func()->sendFrame(frame); + if (!client) { + qAmqpDebug() << Q_FUNC_INFO << "invalid client"; + return; + } + + client->d_func()->sendFrame(frame); } void ChannelPrivate::open() @@ -203,20 +220,22 @@ void ChannelPrivate::openOk(const Frame::Method &frame) q->channelOpened(); } -void ChannelPrivate::setQOS(qint32 prefetchSize, quint16 prefetchCount) -{ - Q_UNUSED(prefetchSize) - Q_UNUSED(prefetchCount) - qAmqpDebug() << Q_FUNC_INFO << "temporarily disabled"; -// client_->d_func()->connection_->d_func()->setQOS(prefetchSize, prefetchCount, channelNumber, false); -} - void ChannelPrivate::_q_disconnected() { nextChannelNumber = 0; opened = false; } +void ChannelPrivate::qosOk(const Frame::Method &frame) +{ + Q_Q(Channel); + Q_UNUSED(frame) + + prefetchCount = requestedPrefetchCount; + prefetchSize = requestedPrefetchSize; + Q_EMIT q->qosDefined(); +} + ////////////////////////////////////////////////////////////////////////// Channel::Channel(int channelNumber, Client *client) @@ -276,10 +295,36 @@ bool Channel::isOpened() const return d->opened; } -void Channel::setQOS(qint32 prefetchSize, quint16 prefetchCount) +void Channel::qos(qint16 prefetchCount, qint32 prefetchSize) { Q_D(Channel); - d->setQOS(prefetchSize, prefetchCount); + Frame::Method frame(Frame::fcBasic, ChannelPrivate::bmQos); + frame.setChannel(d->channelNumber); + + QByteArray arguments; + QDataStream stream(&arguments, QIODevice::WriteOnly); + + d->requestedPrefetchSize = prefetchSize; + d->requestedPrefetchCount = prefetchCount; + + stream << qint32(prefetchSize); + stream << qint16(prefetchCount); + stream << qint8(0x0); // global + + frame.setArguments(arguments); + d->sendFrame(frame); +} + +qint32 Channel::prefetchSize() const +{ + Q_D(const Channel); + return d->prefetchSize; +} + +qint16 Channel::prefetchCount() const +{ + Q_D(const Channel); + return d->prefetchCount; } Error Channel::error() const diff --git a/src/amqp_channel.h b/src/amqp_channel.h index da3c7dd..173eb5f 100644 --- a/src/amqp_channel.h +++ b/src/amqp_channel.h @@ -27,16 +27,22 @@ public: Error error() const; QString errorString() const; + qint32 prefetchSize() const; + qint16 prefetchCount() const; + + // AMQP Basic + void qos(qint16 prefetchCount, qint32 prefetchSize = 0); + public Q_SLOTS: void closeChannel(); void reopen(); - void setQOS(qint32 prefetchSize, quint16 prefetchCount); Q_SIGNALS: void opened(); void closed(); void flowChanged(bool enabled); void error(QAMQP::Error error); + void qosDefined(); protected: virtual void channelOpened() = 0; diff --git a/src/amqp_channel_p.h b/src/amqp_channel_p.h index 6f2f244..1f97bca 100644 --- a/src/amqp_channel_p.h +++ b/src/amqp_channel_p.h @@ -49,7 +49,6 @@ public: void init(int channel, Client *client); void stateChanged(State state); - void setQOS(qint32 prefetchSize, quint16 prefetchCount); void sendFrame(const Frame::Base &frame); void open(); @@ -65,6 +64,7 @@ public: void flowOk(const Frame::Method &frame); void close(const Frame::Method &frame); void closeOk(const Frame::Method &frame); + void qosOk(const Frame::Method &frame); // private slots virtual void _q_disconnected(); @@ -77,6 +77,11 @@ public: bool opened; bool needOpen; + qint32 prefetchSize; + qint32 requestedPrefetchSize; + qint16 prefetchCount; + qint16 requestedPrefetchCount; + Error error; QString errorString; diff --git a/tests/auto/qamqpqueue/tst_qamqpqueue.cpp b/tests/auto/qamqpqueue/tst_qamqpqueue.cpp index 6cdc779..28365d6 100644 --- a/tests/auto/qamqpqueue/tst_qamqpqueue.cpp +++ b/tests/auto/qamqpqueue/tst_qamqpqueue.cpp @@ -37,6 +37,9 @@ private Q_SLOTS: void getEmpty(); void get(); void verifyContentEncodingIssue33(); + void defineQos(); + void invalidQos(); + void qos(); private: void declareQueueAndVerifyConsuming(Queue *queue); @@ -400,5 +403,70 @@ void tst_QAMQPQueue::verifyContentEncodingIssue33() QCOMPARE(contentType, QLatin1String("fakeContentEncoding")); } +void tst_QAMQPQueue::defineQos() +{ + Queue *queue = client->createQueue("test-define-qos"); + declareQueueAndVerifyConsuming(queue); + + queue->qos(10); + QVERIFY(waitForSignal(queue, SIGNAL(qosDefined()))); + QCOMPARE(queue->prefetchCount(), qint16(10)); + QCOMPARE(queue->prefetchSize(), 0); + + // clean up queue + queue->remove(Queue::roForce); + QVERIFY(waitForSignal(queue, SIGNAL(removed()))); +} + +void tst_QAMQPQueue::invalidQos() +{ + Queue *queue = client->createQueue("test-invalid-define-qos"); + declareQueueAndVerifyConsuming(queue); + + queue->qos(10, 10); + QVERIFY(waitForSignal(client.data(), SIGNAL(error(QAMQP::Error)))); + QCOMPARE(client->error(), QAMQP::NotImplementedError); +} + +void tst_QAMQPQueue::qos() +{ + Queue *queue = client->createQueue("test-qos"); + queue->declare(); + QVERIFY(waitForSignal(queue, SIGNAL(declared()))); + + queue->qos(1); + QVERIFY(waitForSignal(queue, SIGNAL(qosDefined()))); + QCOMPARE(queue->prefetchCount(), qint16(1)); + QCOMPARE(queue->prefetchSize(), 0); + + // load up the queue + const int messageCount = 10; + Exchange *defaultExchange = client->createExchange(); + for (int i = 0; i < messageCount; ++i) { + QString message = QString("message %1").arg(i); + defaultExchange->publish(message, "test-qos"); + } + + QTest::qWait(100); + + // begin consuming, one at a time + QVERIFY(queue->consume()); + QVERIFY(waitForSignal(queue, SIGNAL(consuming(QString)))); + + int messageReceivedCount = 0; + while (!queue->isEmpty()) { + QString expected = QString("message %1").arg(messageReceivedCount); + Message message = queue->dequeue(); + QCOMPARE(message.payload(), expected.toUtf8()); + queue->ack(message); + messageReceivedCount++; + + if (messageReceivedCount < messageCount) + QVERIFY(waitForSignal(queue, SIGNAL(messageReceived()))); + } + + QCOMPARE(messageReceivedCount, messageCount); +} + QTEST_MAIN(tst_QAMQPQueue) #include "tst_qamqpqueue.moc"