From 1ebe3bd667f39e0d9779ab7d3a151f0ee43cb16c Mon Sep 17 00:00:00 2001 From: Matt Broadstone Date: Thu, 7 Aug 2014 13:52:09 -0400 Subject: [PATCH] implement flow frame handling support Fleshed out the previously unimplemented flow control frame handling, added an auto test for it. Also refactored out the stateChanged ChannelPrivate method, if we need proper state support in the future it will be added with a better implementation --- src/amqp_channel.cpp | 62 ++++++++++++------------ src/amqp_channel.h | 4 +- src/amqp_channel_p.h | 11 +---- tests/auto/qamqpqueue/tst_qamqpqueue.cpp | 11 +++++ 4 files changed, 46 insertions(+), 42 deletions(-) diff --git a/src/amqp_channel.cpp b/src/amqp_channel.cpp index 9da6da2..0e98a72 100644 --- a/src/amqp_channel.cpp +++ b/src/amqp_channel.cpp @@ -34,25 +34,6 @@ void ChannelPrivate::init(int channel, Client *c) nextChannelNumber = qMax(channelNumber, (nextChannelNumber + 1)); } -void ChannelPrivate::stateChanged(State state) -{ - Q_Q(Channel); - switch(ChannelPrivate::State(state)) { - case ChannelPrivate::csOpened: - Q_EMIT q->opened(); - break; - case ChannelPrivate::csClosed: - Q_EMIT q->closed(); - break; - case ChannelPrivate::csIdle: - Q_EMIT q->flowChanged(false); - break; - case ChannelPrivate::csRunning: - Q_EMIT q->flowChanged(true); - break; - } -} - bool ChannelPrivate::_q_method(const Frame::Method &frame) { Q_ASSERT(frame.channel() == channelNumber); @@ -129,10 +110,21 @@ void ChannelPrivate::open() sendFrame(frame); } -void ChannelPrivate::flow() +void ChannelPrivate::flow(bool active) { + QByteArray arguments; + QDataStream stream(&arguments, QIODevice::WriteOnly); + Frame::writeAmqpField(stream, ShortShortUint, (active ? 1 : 0)); + + Frame::Method frame(Frame::fcChannel, miFlow); + frame.setChannel(channelNumber); + frame.setArguments(arguments); + sendFrame(frame); } +// NOTE: not implemented until I can figure out a good way to force the server +// to pause the channel in a test. It seems like RabbitMQ just doesn't +// care about flow control, preferring rather to use basic.qos void ChannelPrivate::flow(const Frame::Method &frame) { Q_UNUSED(frame); @@ -146,8 +138,14 @@ void ChannelPrivate::flowOk() void ChannelPrivate::flowOk(const Frame::Method &frame) { - Q_UNUSED(frame); - qAmqpDebug() << Q_FUNC_INFO; + Q_Q(Channel); + QByteArray data = frame.arguments(); + QDataStream stream(&data, QIODevice::ReadOnly); + bool active = Frame::readAmqpField(stream, Boolean).toBool(); + if (active) + Q_EMIT q->resumed(); + else + Q_EMIT q->paused(); } void ChannelPrivate::close(int code, const QString &text, int classId, int methodId) @@ -176,7 +174,6 @@ void ChannelPrivate::close(const Frame::Method &frame) { Q_Q(Channel); qAmqpDebug(">> CLOSE"); - stateChanged(csClosed); QByteArray data = frame.arguments(); QDataStream stream(&data, QIODevice::ReadOnly); qint16 code = 0, classId, methodId; @@ -197,6 +194,7 @@ void ChannelPrivate::close(const Frame::Method &frame) qAmqpDebug(">> text: %s", qPrintable(text)); qAmqpDebug(">> class-id: %d", classId); qAmqpDebug(">> method-id: %d", methodId); + Q_EMIT q->closed(); } void ChannelPrivate::closeOk() @@ -205,24 +203,20 @@ void ChannelPrivate::closeOk() sendFrame(frame); } -void ChannelPrivate::closeOk(const Frame::Method &frame) +void ChannelPrivate::closeOk(const Frame::Method &) { - Q_UNUSED(frame) Q_Q(Channel); - - stateChanged(csClosed); + Q_EMIT q->closed(); q->channelClosed(); opened = false; } -void ChannelPrivate::openOk(const Frame::Method &frame) +void ChannelPrivate::openOk(const Frame::Method &) { - Q_UNUSED(frame) Q_Q(Channel); - qAmqpDebug(">> OpenOK"); opened = true; - stateChanged(csOpened); + Q_EMIT q->opened(); q->channelOpened(); } @@ -346,4 +340,10 @@ QString Channel::errorString() const return d->errorString; } +void Channel::resume() +{ + Q_D(Channel); + d->flow(true); +} + #include "moc_amqp_channel.cpp" diff --git a/src/amqp_channel.h b/src/amqp_channel.h index 173eb5f..8dedb4b 100644 --- a/src/amqp_channel.h +++ b/src/amqp_channel.h @@ -36,11 +36,13 @@ public: public Q_SLOTS: void closeChannel(); void reopen(); + void resume(); Q_SIGNALS: void opened(); void closed(); - void flowChanged(bool enabled); + void resumed(); + void paused(); void error(QAMQP::Error error); void qosDefined(); diff --git a/src/amqp_channel_p.h b/src/amqp_channel_p.h index 1f97bca..55ff08d 100644 --- a/src/amqp_channel_p.h +++ b/src/amqp_channel_p.h @@ -21,13 +21,6 @@ public: METHOD_ID_ENUM(miClose, 40) }; - enum State { - csOpened, - csClosed, - csIdle, - csRunning - }; - enum BasicMethod { METHOD_ID_ENUM(bmQos, 10), METHOD_ID_ENUM(bmConsume, 20), @@ -47,12 +40,10 @@ public: virtual ~ChannelPrivate(); void init(int channel, Client *client); - void stateChanged(State state); - void sendFrame(const Frame::Base &frame); void open(); - void flow(); + void flow(bool active); void flowOk(); void close(int code, const QString &text, int classId, int methodId); void closeOk(); diff --git a/tests/auto/qamqpqueue/tst_qamqpqueue.cpp b/tests/auto/qamqpqueue/tst_qamqpqueue.cpp index 0c66320..b7dfa82 100644 --- a/tests/auto/qamqpqueue/tst_qamqpqueue.cpp +++ b/tests/auto/qamqpqueue/tst_qamqpqueue.cpp @@ -47,6 +47,7 @@ private Q_SLOTS: void tableFieldDataTypes(); void messageProperties(); void closeChannel(); + void resumeChannel(); private: void declareQueueAndVerifyConsuming(Queue *queue); @@ -675,5 +676,15 @@ void tst_QAMQPQueue::closeChannel() QVERIFY(waitForSignal(queue, SIGNAL(closed()))); } +void tst_QAMQPQueue::resumeChannel() +{ + Queue *queue = client->createQueue("test-resume"); + QVERIFY(waitForSignal(queue, SIGNAL(opened()))); + declareQueueAndVerifyConsuming(queue); + + queue->resume(); + QVERIFY(waitForSignal(queue, SIGNAL(resumed()))); +} + QTEST_MAIN(tst_QAMQPQueue) #include "tst_qamqpqueue.moc"