diff --git a/src/amqp_channel.cpp b/src/amqp_channel.cpp index 9da6da2..0e98a72 100644 --- a/src/amqp_channel.cpp +++ b/src/amqp_channel.cpp @@ -34,25 +34,6 @@ void ChannelPrivate::init(int channel, Client *c) nextChannelNumber = qMax(channelNumber, (nextChannelNumber + 1)); } -void ChannelPrivate::stateChanged(State state) -{ - Q_Q(Channel); - switch(ChannelPrivate::State(state)) { - case ChannelPrivate::csOpened: - Q_EMIT q->opened(); - break; - case ChannelPrivate::csClosed: - Q_EMIT q->closed(); - break; - case ChannelPrivate::csIdle: - Q_EMIT q->flowChanged(false); - break; - case ChannelPrivate::csRunning: - Q_EMIT q->flowChanged(true); - break; - } -} - bool ChannelPrivate::_q_method(const Frame::Method &frame) { Q_ASSERT(frame.channel() == channelNumber); @@ -129,10 +110,21 @@ void ChannelPrivate::open() sendFrame(frame); } -void ChannelPrivate::flow() +void ChannelPrivate::flow(bool active) { + QByteArray arguments; + QDataStream stream(&arguments, QIODevice::WriteOnly); + Frame::writeAmqpField(stream, ShortShortUint, (active ? 1 : 0)); + + Frame::Method frame(Frame::fcChannel, miFlow); + frame.setChannel(channelNumber); + frame.setArguments(arguments); + sendFrame(frame); } +// NOTE: not implemented until I can figure out a good way to force the server +// to pause the channel in a test. It seems like RabbitMQ just doesn't +// care about flow control, preferring rather to use basic.qos void ChannelPrivate::flow(const Frame::Method &frame) { Q_UNUSED(frame); @@ -146,8 +138,14 @@ void ChannelPrivate::flowOk() void ChannelPrivate::flowOk(const Frame::Method &frame) { - Q_UNUSED(frame); - qAmqpDebug() << Q_FUNC_INFO; + Q_Q(Channel); + QByteArray data = frame.arguments(); + QDataStream stream(&data, QIODevice::ReadOnly); + bool active = Frame::readAmqpField(stream, Boolean).toBool(); + if (active) + Q_EMIT q->resumed(); + else + Q_EMIT q->paused(); } void ChannelPrivate::close(int code, const QString &text, int classId, int methodId) @@ -176,7 +174,6 @@ void ChannelPrivate::close(const Frame::Method &frame) { Q_Q(Channel); qAmqpDebug(">> CLOSE"); - stateChanged(csClosed); QByteArray data = frame.arguments(); QDataStream stream(&data, QIODevice::ReadOnly); qint16 code = 0, classId, methodId; @@ -197,6 +194,7 @@ void ChannelPrivate::close(const Frame::Method &frame) qAmqpDebug(">> text: %s", qPrintable(text)); qAmqpDebug(">> class-id: %d", classId); qAmqpDebug(">> method-id: %d", methodId); + Q_EMIT q->closed(); } void ChannelPrivate::closeOk() @@ -205,24 +203,20 @@ void ChannelPrivate::closeOk() sendFrame(frame); } -void ChannelPrivate::closeOk(const Frame::Method &frame) +void ChannelPrivate::closeOk(const Frame::Method &) { - Q_UNUSED(frame) Q_Q(Channel); - - stateChanged(csClosed); + Q_EMIT q->closed(); q->channelClosed(); opened = false; } -void ChannelPrivate::openOk(const Frame::Method &frame) +void ChannelPrivate::openOk(const Frame::Method &) { - Q_UNUSED(frame) Q_Q(Channel); - qAmqpDebug(">> OpenOK"); opened = true; - stateChanged(csOpened); + Q_EMIT q->opened(); q->channelOpened(); } @@ -346,4 +340,10 @@ QString Channel::errorString() const return d->errorString; } +void Channel::resume() +{ + Q_D(Channel); + d->flow(true); +} + #include "moc_amqp_channel.cpp" diff --git a/src/amqp_channel.h b/src/amqp_channel.h index 173eb5f..8dedb4b 100644 --- a/src/amqp_channel.h +++ b/src/amqp_channel.h @@ -36,11 +36,13 @@ public: public Q_SLOTS: void closeChannel(); void reopen(); + void resume(); Q_SIGNALS: void opened(); void closed(); - void flowChanged(bool enabled); + void resumed(); + void paused(); void error(QAMQP::Error error); void qosDefined(); diff --git a/src/amqp_channel_p.h b/src/amqp_channel_p.h index 1f97bca..55ff08d 100644 --- a/src/amqp_channel_p.h +++ b/src/amqp_channel_p.h @@ -21,13 +21,6 @@ public: METHOD_ID_ENUM(miClose, 40) }; - enum State { - csOpened, - csClosed, - csIdle, - csRunning - }; - enum BasicMethod { METHOD_ID_ENUM(bmQos, 10), METHOD_ID_ENUM(bmConsume, 20), @@ -47,12 +40,10 @@ public: virtual ~ChannelPrivate(); void init(int channel, Client *client); - void stateChanged(State state); - void sendFrame(const Frame::Base &frame); void open(); - void flow(); + void flow(bool active); void flowOk(); void close(int code, const QString &text, int classId, int methodId); void closeOk(); diff --git a/tests/auto/qamqpqueue/tst_qamqpqueue.cpp b/tests/auto/qamqpqueue/tst_qamqpqueue.cpp index 0c66320..b7dfa82 100644 --- a/tests/auto/qamqpqueue/tst_qamqpqueue.cpp +++ b/tests/auto/qamqpqueue/tst_qamqpqueue.cpp @@ -47,6 +47,7 @@ private Q_SLOTS: void tableFieldDataTypes(); void messageProperties(); void closeChannel(); + void resumeChannel(); private: void declareQueueAndVerifyConsuming(Queue *queue); @@ -675,5 +676,15 @@ void tst_QAMQPQueue::closeChannel() QVERIFY(waitForSignal(queue, SIGNAL(closed()))); } +void tst_QAMQPQueue::resumeChannel() +{ + Queue *queue = client->createQueue("test-resume"); + QVERIFY(waitForSignal(queue, SIGNAL(opened()))); + declareQueueAndVerifyConsuming(queue); + + queue->resume(); + QVERIFY(waitForSignal(queue, SIGNAL(resumed()))); +} + QTEST_MAIN(tst_QAMQPQueue) #include "tst_qamqpqueue.moc"