diff --git a/src/amqp_channel.cpp b/src/amqp_channel.cpp index 12e2d64..675a7aa 100644 --- a/src/amqp_channel.cpp +++ b/src/amqp_channel.cpp @@ -13,6 +13,10 @@ ChannelPrivate::ChannelPrivate(Channel *q) : channelNumber(0), opened(false), needOpen(true), + prefetchSize(0), + requestedPrefetchSize(0), + prefetchCount(0), + requestedPrefetchCount(0), error(QAMQP::NoError), q_ptr(q) { @@ -55,6 +59,15 @@ bool ChannelPrivate::_q_method(const Frame::Method &frame) if (frame.channel() != channelNumber) return true; + if (frame.methodClass() == Frame::fcBasic) { + if (frame.id() == bmQosOk) { + qosOk(frame); + return true; + } + + return false; + } + if (frame.methodClass() != Frame::fcChannel) return false; @@ -88,8 +101,12 @@ void ChannelPrivate::_q_open() void ChannelPrivate::sendFrame(const Frame::Base &frame) { - if (client) - client->d_func()->sendFrame(frame); + if (!client) { + qAmqpDebug() << Q_FUNC_INFO << "invalid client"; + return; + } + + client->d_func()->sendFrame(frame); } void ChannelPrivate::open() @@ -203,20 +220,22 @@ void ChannelPrivate::openOk(const Frame::Method &frame) q->channelOpened(); } -void ChannelPrivate::setQOS(qint32 prefetchSize, quint16 prefetchCount) -{ - Q_UNUSED(prefetchSize) - Q_UNUSED(prefetchCount) - qAmqpDebug() << Q_FUNC_INFO << "temporarily disabled"; -// client_->d_func()->connection_->d_func()->setQOS(prefetchSize, prefetchCount, channelNumber, false); -} - void ChannelPrivate::_q_disconnected() { nextChannelNumber = 0; opened = false; } +void ChannelPrivate::qosOk(const Frame::Method &frame) +{ + Q_Q(Channel); + Q_UNUSED(frame) + + prefetchCount = requestedPrefetchCount; + prefetchSize = requestedPrefetchSize; + Q_EMIT q->qosDefined(); +} + ////////////////////////////////////////////////////////////////////////// Channel::Channel(int channelNumber, Client *client) @@ -276,10 +295,36 @@ bool Channel::isOpened() const return d->opened; } -void Channel::setQOS(qint32 prefetchSize, quint16 prefetchCount) +void Channel::qos(qint16 prefetchCount, qint32 prefetchSize) { Q_D(Channel); - d->setQOS(prefetchSize, prefetchCount); + Frame::Method frame(Frame::fcBasic, ChannelPrivate::bmQos); + frame.setChannel(d->channelNumber); + + QByteArray arguments; + QDataStream stream(&arguments, QIODevice::WriteOnly); + + d->requestedPrefetchSize = prefetchSize; + d->requestedPrefetchCount = prefetchCount; + + stream << qint32(prefetchSize); + stream << qint16(prefetchCount); + stream << qint8(0x0); // global + + frame.setArguments(arguments); + d->sendFrame(frame); +} + +qint32 Channel::prefetchSize() const +{ + Q_D(const Channel); + return d->prefetchSize; +} + +qint16 Channel::prefetchCount() const +{ + Q_D(const Channel); + return d->prefetchCount; } Error Channel::error() const diff --git a/src/amqp_channel.h b/src/amqp_channel.h index da3c7dd..173eb5f 100644 --- a/src/amqp_channel.h +++ b/src/amqp_channel.h @@ -27,16 +27,22 @@ public: Error error() const; QString errorString() const; + qint32 prefetchSize() const; + qint16 prefetchCount() const; + + // AMQP Basic + void qos(qint16 prefetchCount, qint32 prefetchSize = 0); + public Q_SLOTS: void closeChannel(); void reopen(); - void setQOS(qint32 prefetchSize, quint16 prefetchCount); Q_SIGNALS: void opened(); void closed(); void flowChanged(bool enabled); void error(QAMQP::Error error); + void qosDefined(); protected: virtual void channelOpened() = 0; diff --git a/src/amqp_channel_p.h b/src/amqp_channel_p.h index 6f2f244..1f97bca 100644 --- a/src/amqp_channel_p.h +++ b/src/amqp_channel_p.h @@ -49,7 +49,6 @@ public: void init(int channel, Client *client); void stateChanged(State state); - void setQOS(qint32 prefetchSize, quint16 prefetchCount); void sendFrame(const Frame::Base &frame); void open(); @@ -65,6 +64,7 @@ public: void flowOk(const Frame::Method &frame); void close(const Frame::Method &frame); void closeOk(const Frame::Method &frame); + void qosOk(const Frame::Method &frame); // private slots virtual void _q_disconnected(); @@ -77,6 +77,11 @@ public: bool opened; bool needOpen; + qint32 prefetchSize; + qint32 requestedPrefetchSize; + qint16 prefetchCount; + qint16 requestedPrefetchCount; + Error error; QString errorString; diff --git a/tests/auto/qamqpqueue/tst_qamqpqueue.cpp b/tests/auto/qamqpqueue/tst_qamqpqueue.cpp index 6cdc779..28365d6 100644 --- a/tests/auto/qamqpqueue/tst_qamqpqueue.cpp +++ b/tests/auto/qamqpqueue/tst_qamqpqueue.cpp @@ -37,6 +37,9 @@ private Q_SLOTS: void getEmpty(); void get(); void verifyContentEncodingIssue33(); + void defineQos(); + void invalidQos(); + void qos(); private: void declareQueueAndVerifyConsuming(Queue *queue); @@ -400,5 +403,70 @@ void tst_QAMQPQueue::verifyContentEncodingIssue33() QCOMPARE(contentType, QLatin1String("fakeContentEncoding")); } +void tst_QAMQPQueue::defineQos() +{ + Queue *queue = client->createQueue("test-define-qos"); + declareQueueAndVerifyConsuming(queue); + + queue->qos(10); + QVERIFY(waitForSignal(queue, SIGNAL(qosDefined()))); + QCOMPARE(queue->prefetchCount(), qint16(10)); + QCOMPARE(queue->prefetchSize(), 0); + + // clean up queue + queue->remove(Queue::roForce); + QVERIFY(waitForSignal(queue, SIGNAL(removed()))); +} + +void tst_QAMQPQueue::invalidQos() +{ + Queue *queue = client->createQueue("test-invalid-define-qos"); + declareQueueAndVerifyConsuming(queue); + + queue->qos(10, 10); + QVERIFY(waitForSignal(client.data(), SIGNAL(error(QAMQP::Error)))); + QCOMPARE(client->error(), QAMQP::NotImplementedError); +} + +void tst_QAMQPQueue::qos() +{ + Queue *queue = client->createQueue("test-qos"); + queue->declare(); + QVERIFY(waitForSignal(queue, SIGNAL(declared()))); + + queue->qos(1); + QVERIFY(waitForSignal(queue, SIGNAL(qosDefined()))); + QCOMPARE(queue->prefetchCount(), qint16(1)); + QCOMPARE(queue->prefetchSize(), 0); + + // load up the queue + const int messageCount = 10; + Exchange *defaultExchange = client->createExchange(); + for (int i = 0; i < messageCount; ++i) { + QString message = QString("message %1").arg(i); + defaultExchange->publish(message, "test-qos"); + } + + QTest::qWait(100); + + // begin consuming, one at a time + QVERIFY(queue->consume()); + QVERIFY(waitForSignal(queue, SIGNAL(consuming(QString)))); + + int messageReceivedCount = 0; + while (!queue->isEmpty()) { + QString expected = QString("message %1").arg(messageReceivedCount); + Message message = queue->dequeue(); + QCOMPARE(message.payload(), expected.toUtf8()); + queue->ack(message); + messageReceivedCount++; + + if (messageReceivedCount < messageCount) + QVERIFY(waitForSignal(queue, SIGNAL(messageReceived()))); + } + + QCOMPARE(messageReceivedCount, messageCount); +} + QTEST_MAIN(tst_QAMQPQueue) #include "tst_qamqpqueue.moc"