add qos support to channel

AMQP Basic qos support added to Channel class. There is an option
for global qos which is currently not added. Also, added auto tests
for qos definition, proper qos operation and verifying that RabbitMQ
does not support prefetchSize.
This commit is contained in:
Matt Broadstone 2014-06-26 10:28:58 -04:00
parent 9bd5610ac7
commit 12a03f959f
4 changed files with 138 additions and 14 deletions

View File

@ -13,6 +13,10 @@ ChannelPrivate::ChannelPrivate(Channel *q)
: channelNumber(0),
opened(false),
needOpen(true),
prefetchSize(0),
requestedPrefetchSize(0),
prefetchCount(0),
requestedPrefetchCount(0),
error(QAMQP::NoError),
q_ptr(q)
{
@ -55,6 +59,15 @@ bool ChannelPrivate::_q_method(const Frame::Method &frame)
if (frame.channel() != channelNumber)
return true;
if (frame.methodClass() == Frame::fcBasic) {
if (frame.id() == bmQosOk) {
qosOk(frame);
return true;
}
return false;
}
if (frame.methodClass() != Frame::fcChannel)
return false;
@ -88,8 +101,12 @@ void ChannelPrivate::_q_open()
void ChannelPrivate::sendFrame(const Frame::Base &frame)
{
if (client)
client->d_func()->sendFrame(frame);
if (!client) {
qAmqpDebug() << Q_FUNC_INFO << "invalid client";
return;
}
client->d_func()->sendFrame(frame);
}
void ChannelPrivate::open()
@ -203,20 +220,22 @@ void ChannelPrivate::openOk(const Frame::Method &frame)
q->channelOpened();
}
void ChannelPrivate::setQOS(qint32 prefetchSize, quint16 prefetchCount)
{
Q_UNUSED(prefetchSize)
Q_UNUSED(prefetchCount)
qAmqpDebug() << Q_FUNC_INFO << "temporarily disabled";
// client_->d_func()->connection_->d_func()->setQOS(prefetchSize, prefetchCount, channelNumber, false);
}
void ChannelPrivate::_q_disconnected()
{
nextChannelNumber = 0;
opened = false;
}
void ChannelPrivate::qosOk(const Frame::Method &frame)
{
Q_Q(Channel);
Q_UNUSED(frame)
prefetchCount = requestedPrefetchCount;
prefetchSize = requestedPrefetchSize;
Q_EMIT q->qosDefined();
}
//////////////////////////////////////////////////////////////////////////
Channel::Channel(int channelNumber, Client *client)
@ -276,10 +295,36 @@ bool Channel::isOpened() const
return d->opened;
}
void Channel::setQOS(qint32 prefetchSize, quint16 prefetchCount)
void Channel::qos(qint16 prefetchCount, qint32 prefetchSize)
{
Q_D(Channel);
d->setQOS(prefetchSize, prefetchCount);
Frame::Method frame(Frame::fcBasic, ChannelPrivate::bmQos);
frame.setChannel(d->channelNumber);
QByteArray arguments;
QDataStream stream(&arguments, QIODevice::WriteOnly);
d->requestedPrefetchSize = prefetchSize;
d->requestedPrefetchCount = prefetchCount;
stream << qint32(prefetchSize);
stream << qint16(prefetchCount);
stream << qint8(0x0); // global
frame.setArguments(arguments);
d->sendFrame(frame);
}
qint32 Channel::prefetchSize() const
{
Q_D(const Channel);
return d->prefetchSize;
}
qint16 Channel::prefetchCount() const
{
Q_D(const Channel);
return d->prefetchCount;
}
Error Channel::error() const

View File

@ -27,16 +27,22 @@ public:
Error error() const;
QString errorString() const;
qint32 prefetchSize() const;
qint16 prefetchCount() const;
// AMQP Basic
void qos(qint16 prefetchCount, qint32 prefetchSize = 0);
public Q_SLOTS:
void closeChannel();
void reopen();
void setQOS(qint32 prefetchSize, quint16 prefetchCount);
Q_SIGNALS:
void opened();
void closed();
void flowChanged(bool enabled);
void error(QAMQP::Error error);
void qosDefined();
protected:
virtual void channelOpened() = 0;

View File

@ -49,7 +49,6 @@ public:
void init(int channel, Client *client);
void stateChanged(State state);
void setQOS(qint32 prefetchSize, quint16 prefetchCount);
void sendFrame(const Frame::Base &frame);
void open();
@ -65,6 +64,7 @@ public:
void flowOk(const Frame::Method &frame);
void close(const Frame::Method &frame);
void closeOk(const Frame::Method &frame);
void qosOk(const Frame::Method &frame);
// private slots
virtual void _q_disconnected();
@ -77,6 +77,11 @@ public:
bool opened;
bool needOpen;
qint32 prefetchSize;
qint32 requestedPrefetchSize;
qint16 prefetchCount;
qint16 requestedPrefetchCount;
Error error;
QString errorString;

View File

@ -37,6 +37,9 @@ private Q_SLOTS:
void getEmpty();
void get();
void verifyContentEncodingIssue33();
void defineQos();
void invalidQos();
void qos();
private:
void declareQueueAndVerifyConsuming(Queue *queue);
@ -400,5 +403,70 @@ void tst_QAMQPQueue::verifyContentEncodingIssue33()
QCOMPARE(contentType, QLatin1String("fakeContentEncoding"));
}
void tst_QAMQPQueue::defineQos()
{
Queue *queue = client->createQueue("test-define-qos");
declareQueueAndVerifyConsuming(queue);
queue->qos(10);
QVERIFY(waitForSignal(queue, SIGNAL(qosDefined())));
QCOMPARE(queue->prefetchCount(), qint16(10));
QCOMPARE(queue->prefetchSize(), 0);
// clean up queue
queue->remove(Queue::roForce);
QVERIFY(waitForSignal(queue, SIGNAL(removed())));
}
void tst_QAMQPQueue::invalidQos()
{
Queue *queue = client->createQueue("test-invalid-define-qos");
declareQueueAndVerifyConsuming(queue);
queue->qos(10, 10);
QVERIFY(waitForSignal(client.data(), SIGNAL(error(QAMQP::Error))));
QCOMPARE(client->error(), QAMQP::NotImplementedError);
}
void tst_QAMQPQueue::qos()
{
Queue *queue = client->createQueue("test-qos");
queue->declare();
QVERIFY(waitForSignal(queue, SIGNAL(declared())));
queue->qos(1);
QVERIFY(waitForSignal(queue, SIGNAL(qosDefined())));
QCOMPARE(queue->prefetchCount(), qint16(1));
QCOMPARE(queue->prefetchSize(), 0);
// load up the queue
const int messageCount = 10;
Exchange *defaultExchange = client->createExchange();
for (int i = 0; i < messageCount; ++i) {
QString message = QString("message %1").arg(i);
defaultExchange->publish(message, "test-qos");
}
QTest::qWait(100);
// begin consuming, one at a time
QVERIFY(queue->consume());
QVERIFY(waitForSignal(queue, SIGNAL(consuming(QString))));
int messageReceivedCount = 0;
while (!queue->isEmpty()) {
QString expected = QString("message %1").arg(messageReceivedCount);
Message message = queue->dequeue();
QCOMPARE(message.payload(), expected.toUtf8());
queue->ack(message);
messageReceivedCount++;
if (messageReceivedCount < messageCount)
QVERIFY(waitForSignal(queue, SIGNAL(messageReceived())));
}
QCOMPARE(messageReceivedCount, messageCount);
}
QTEST_MAIN(tst_QAMQPQueue)
#include "tst_qamqpqueue.moc"