diff --git a/src/amqp_channel.cpp b/src/amqp_channel.cpp index 2273fcd..fe9959f 100644 --- a/src/amqp_channel.cpp +++ b/src/amqp_channel.cpp @@ -13,6 +13,7 @@ ChannelPrivate::ChannelPrivate(Channel *q) : channelNumber(0), opened(false), needOpen(true), + error(Channel::NoError), q_ptr(q) { } @@ -118,15 +119,18 @@ void ChannelPrivate::flow() void ChannelPrivate::flow(const Frame::Method &frame) { Q_UNUSED(frame); + qDebug() << Q_FUNC_INFO; } void ChannelPrivate::flowOk() { + qDebug() << Q_FUNC_INFO; } void ChannelPrivate::flowOk(const Frame::Method &frame) { Q_UNUSED(frame); + qDebug() << Q_FUNC_INFO; } void ChannelPrivate::close(int code, const QString &text, int classId, int methodId) @@ -148,6 +152,7 @@ void ChannelPrivate::close(int code, const QString &text, int classId, int metho void ChannelPrivate::close(const Frame::Method &frame) { + Q_Q(Channel); qDebug(">> CLOSE"); stateChanged(csClosed); QByteArray data = frame.arguments(); @@ -158,6 +163,13 @@ void ChannelPrivate::close(const Frame::Method &frame) stream >> classId; stream >> methodId; + Channel::ChannelError checkError = static_cast(code); + if (checkError != Channel::NoError) { + error = checkError; + errorString = qPrintable(text); + Q_EMIT q->error(error); + } + qDebug(">> code: %d", code); qDebug(">> text: %s", qPrintable(text)); qDebug(">> class-id: %d", classId); @@ -270,4 +282,16 @@ void Channel::setQOS(qint32 prefetchSize, quint16 prefetchCount) d->setQOS(prefetchSize, prefetchCount); } +Channel::ChannelError Channel::error() const +{ + Q_D(const Channel); + return d->error; +} + +QString Channel::errorString() const +{ + Q_D(const Channel); + return d->errorString; +} + #include "moc_amqp_channel.cpp" diff --git a/src/amqp_channel.h b/src/amqp_channel.h index aa677f7..017303e 100644 --- a/src/amqp_channel.h +++ b/src/amqp_channel.h @@ -24,6 +24,18 @@ public: QString name() const; void setName(const QString &name); + enum ChannelError { + NoError = 0, + ContentTooLarge = 311, + NoConsumers = 313, + AccessRefused = 403, + NotFound = 404, + ResourceLocked = 405, + PreconditionFailed = 406 + }; + ChannelError error() const; + QString errorString() const; + public Q_SLOTS: void closeChannel(); void reopen(); @@ -33,6 +45,7 @@ Q_SIGNALS: void opened(); void closed(); void flowChanged(bool enabled); + void error(ChannelError error); protected: virtual void channelOpened() = 0; diff --git a/src/amqp_channel_p.h b/src/amqp_channel_p.h index 92a63a9..ec17cdf 100644 --- a/src/amqp_channel_p.h +++ b/src/amqp_channel_p.h @@ -77,6 +77,9 @@ public: bool opened; bool needOpen; + Channel::ChannelError error; + QString errorString; + Q_DECLARE_PUBLIC(Channel) Channel * const q_ptr; }; diff --git a/src/amqp_client.cpp b/src/amqp_client.cpp index 2d7b1b2..41ed293 100644 --- a/src/amqp_client.cpp +++ b/src/amqp_client.cpp @@ -21,6 +21,7 @@ ClientPrivate::ClientPrivate(Client *q) socket(0), closed(false), connected(false), + error(Client::NoError), q_ptr(q) { } @@ -335,6 +336,13 @@ void ClientPrivate::close(const Frame::Method &frame) stream >> classId; stream >> methodId; + Client::ConnectionError checkError = static_cast(code); + if (checkError != Client::NoError) { + error = checkError; + errorString = qPrintable(text); + Q_EMIT q->error(error); + } + qDebug(">> code: %d", code); qDebug(">> text: %s", qPrintable(text)); qDebug(">> class-id: %d", classId); diff --git a/src/amqp_client.h b/src/amqp_client.h index 24f8395..70b0215 100644 --- a/src/amqp_client.h +++ b/src/amqp_client.h @@ -61,6 +61,23 @@ public: void addCustomProperty(const QString &name, const QString &value); QString customProperty(const QString &name) const; + enum ConnectionError { + NoError = 0, + ConnectionForced = 320, + InvalidPath = 402, + FrameError = 501, + SyntaxError = 502, + CommandInvalid = 503, + ChannelError = 504, + UnexpectedFrame = 505, + ResourceError = 506, + NotAllowed = 530, + NotImplemented = 540, + InternalError = 541 + }; + ConnectionError error() const; + QString errorString() const; + // channels Exchange *createExchange(int channelNumber = -1); Exchange *createExchange(const QString &name, int channelNumber = -1); @@ -76,6 +93,7 @@ public: Q_SIGNALS: void connected(); void disconnected(); + void error(ConnectionError error); protected: Client(ClientPrivate *dd, QObject *parent = 0); diff --git a/src/amqp_client_p.h b/src/amqp_client_p.h index 25b0eaa..a38279d 100644 --- a/src/amqp_client_p.h +++ b/src/amqp_client_p.h @@ -91,6 +91,9 @@ public: QPointer heartbeatTimer; Frame::TableField customProperties; + Client::ConnectionError error; + QString errorString; + Client * const q_ptr; Q_DECLARE_PUBLIC(Client) diff --git a/src/amqp_queue.cpp b/src/amqp_queue.cpp index 44fba5b..7172082 100644 --- a/src/amqp_queue.cpp +++ b/src/amqp_queue.cpp @@ -34,7 +34,7 @@ bool QueuePrivate::_q_method(const Frame::Method &frame) case miDeclareOk: declareOk(frame); break; - case miDelete: + case miDeleteOk: deleteOk(frame); break; case miBindOk: @@ -180,8 +180,6 @@ void QueuePrivate::getOk(const Frame::Method &frame) void QueuePrivate::consumeOk(const Frame::Method &frame) { qDebug() << "Consume ok: " << name; - declared = false; - QByteArray data = frame.arguments(); QDataStream stream(&data, QIODevice::ReadOnly); consumerTag = Frame::readField('s',stream).toString(); @@ -190,6 +188,7 @@ void QueuePrivate::consumeOk(const Frame::Method &frame) void QueuePrivate::deliver(const Frame::Method &frame) { + qDebug() << Q_FUNC_INFO; QByteArray data = frame.arguments(); QDataStream in(&data, QIODevice::ReadOnly); QString consumer_ = Frame::readField('s',in).toString(); @@ -206,6 +205,41 @@ void QueuePrivate::deliver(const Frame::Method &frame) messages.enqueue(message); } +void QueuePrivate::declare() +{ + if (name.isEmpty()) { + qDebug() << Q_FUNC_INFO << "can't declare queue with no name"; + return; + } + + Frame::Method frame(Frame::fcQueue, QueuePrivate::miDeclare); + frame.setChannel(channelNumber); + + QByteArray arguments; + QDataStream out(&arguments, QIODevice::WriteOnly); + + out << qint16(0); //reserved 1 + Frame::writeField('s', out, name); + + qDebug() << "DECLARE OPTIONS: "; + if (options & Queue::NoOptions) qDebug() << "NoOptions"; + if (options & Queue::Passive) qDebug() << "Passive"; + if (options & Queue::Durable) qDebug() << "Durable"; + if (options & Queue::Exclusive) qDebug() << "Exclusive"; + if (options & Queue::AutoDelete) qDebug() << "AutoDelete"; + if (options & Queue::NoWait) qDebug() << "NoWait"; + + out << qint8(options); + Frame::writeField('F', out, Frame::TableField()); + + frame.setArguments(arguments); + sendFrame(frame); + + if (delayedDeclare) + delayedDeclare = false; +} + + ////////////////////////////////////////////////////////////////////////// Queue::Queue(int channelNumber, Client *parent) @@ -217,14 +251,13 @@ Queue::Queue(int channelNumber, Client *parent) Queue::~Queue() { - remove(); } void Queue::channelOpened() { Q_D(Queue); if (d->delayedDeclare) - declare(); + d->declare(); if (!d->delayedBindings.isEmpty()) { typedef QPair BindingPair; @@ -236,10 +269,9 @@ void Queue::channelOpened() void Queue::channelClosed() { - remove(true, true); } -Queue::QueueOptions Queue::option() const +int Queue::options() const { Q_D(const Queue); return d->options; @@ -257,7 +289,7 @@ bool Queue::noAck() const return d->noAck; } -void Queue::declare(const QString &name, QueueOptions options) +void Queue::declare(const QString &name, int options) { Q_D(Queue); if (!name.isEmpty()) @@ -269,24 +301,10 @@ void Queue::declare(const QString &name, QueueOptions options) return; } - Frame::Method frame(Frame::fcQueue, QueuePrivate::miDeclare); - frame.setChannel(d->channelNumber); - - QByteArray arguments; - QDataStream out(&arguments, QIODevice::WriteOnly); - - out << qint16(0); //reserved 1 - Frame::writeField('s', out, d->name); - - out << qint8(options); - Frame::writeField('F', out, Frame::TableField()); - - frame.setArguments(arguments); - d->sendFrame(frame); - d->delayedDeclare = false; + d->declare(); } -void Queue::remove(bool ifUnused, bool ifEmpty, bool noWait) +void Queue::remove(int options) { Q_D(Queue); if (!d->declared) { @@ -302,12 +320,7 @@ void Queue::remove(bool ifUnused, bool ifEmpty, bool noWait) out << qint16(0); //reserved 1 Frame::writeField('s', out, d->name); - - qint8 flag = 0; - flag |= (ifUnused ? 0x1 : 0); - flag |= (ifEmpty ? 0x2 : 0); - flag |= (noWait ? 0x4 : 0); - out << flag; + out << qint8(options); frame.setArguments(arguments); d->sendFrame(frame); @@ -420,7 +433,7 @@ bool Queue::hasMessage() const return message.d->leftSize == 0; } -void Queue::consume(ConsumeOptions options) +void Queue::consume(int options) { Q_D(Queue); if (!d->opened) { diff --git a/src/amqp_queue.h b/src/amqp_queue.h index 36a2813..db3957e 100644 --- a/src/amqp_queue.h +++ b/src/amqp_queue.h @@ -16,9 +16,12 @@ class QAMQP_EXPORT Queue : public Channel { Q_OBJECT Q_ENUMS(QueueOptions) - Q_PROPERTY(QueueOptions option READ option) + Q_PROPERTY(int options READ options CONSTANT) Q_PROPERTY(QString consumerTag READ consumerTag WRITE setConsumerTag) Q_PROPERTY(bool noAck READ noAck WRITE setNoAck) + Q_ENUMS(QueueOption) + Q_ENUMS(ConsumeOption) + Q_ENUMS(RemoveOption) public: enum QueueOption { @@ -30,6 +33,7 @@ public: NoWait = 0x10 }; Q_DECLARE_FLAGS(QueueOptions, QueueOption) + int options() const; enum ConsumeOption { coNoLocal = 0x1, @@ -39,8 +43,14 @@ public: }; Q_DECLARE_FLAGS(ConsumeOptions, ConsumeOption) + enum RemoveOption { + roIfUnused = 0x1, + roIfEmpty = 0x02, + roNoWait = 0x04 + }; + Q_DECLARE_FLAGS(RemoveOptions, RemoveOption) + ~Queue(); - QueueOptions option() const; bool hasMessage() const; Message getMessage(); @@ -52,17 +62,16 @@ public: bool noAck() const; // AMQP Queue - void declare(const QString &name = QString(), - QueueOptions options = QueueOptions(Durable | AutoDelete)); + void declare(const QString &name = QString(), int options = Durable|AutoDelete); void bind(const QString &exchangeName, const QString &key); void bind(Exchange *exchange, const QString &key); void unbind(const QString &exchangeName, const QString &key); void unbind(Exchange *exchange, const QString &key); void purge(); - void remove(bool ifUnused = true, bool ifEmpty = true, bool noWait = true); + void remove(int options = roIfUnused|roIfEmpty|roNoWait); // AMQP Basic - void consume(ConsumeOptions options = ConsumeOptions(NoOptions)); + void consume(int options = NoOptions); void get(); void ack(const Message &message); @@ -80,7 +89,7 @@ protected: virtual void channelClosed(); private: - Queue(int channelNumber = -1, Client *parent = 0); + explicit Queue(int channelNumber = -1, Client *parent = 0); Q_DISABLE_COPY(Queue) Q_DECLARE_PRIVATE(Queue) diff --git a/src/amqp_queue_p.h b/src/amqp_queue_p.h index e0778da..bffd5c1 100644 --- a/src/amqp_queue_p.h +++ b/src/amqp_queue_p.h @@ -23,6 +23,8 @@ public: QueuePrivate(Queue *q); ~QueuePrivate(); + void declare(); + // method handler related virtual bool _q_method(const Frame::Method &frame); virtual void _q_content(const Frame::Content &frame); @@ -36,7 +38,7 @@ public: void deliver(const Frame::Method &frame); QString type; - Queue::QueueOptions options; + int options; bool delayedDeclare; bool declared; bool noAck; diff --git a/tests/auto/qamqpqueue/tst_qamqpqueue.cpp b/tests/auto/qamqpqueue/tst_qamqpqueue.cpp index e13d624..92e80f4 100644 --- a/tests/auto/qamqpqueue/tst_qamqpqueue.cpp +++ b/tests/auto/qamqpqueue/tst_qamqpqueue.cpp @@ -1,3 +1,5 @@ +#include + #include #include "amqp_testcase.h" @@ -10,29 +12,87 @@ class tst_QAMQPQueue : public TestCase { Q_OBJECT private Q_SLOTS: + void init(); + void cleanup(); + void defaultExchange(); + void remove(); + void removeIfUnused(); + +private: // disabled + void removeIfEmpty(); + +private: + QScopedPointer client; + }; +void tst_QAMQPQueue::init() +{ + client.reset(new Client); + client->connectToHost(); + QVERIFY(waitForSignal(client.data(), SIGNAL(connected()))); +} + +void tst_QAMQPQueue::cleanup() +{ + client->disconnectFromHost(); + QVERIFY(waitForSignal(client.data(), SIGNAL(disconnected()))); +} + void tst_QAMQPQueue::defaultExchange() { - Client client; - client.connectToHost(); - QVERIFY(waitForSignal(&client, SIGNAL(connected()))); - - Queue *queue = client.createQueue("test-default-exchange"); + Queue *queue = client->createQueue("test-default-exchange"); queue->declare(); QVERIFY(waitForSignal(queue, SIGNAL(declared()))); queue->consume(); - Exchange *defaultExchange = client.createExchange(); + Exchange *defaultExchange = client->createExchange(); defaultExchange->publish("test-default-exchange", "first message"); QVERIFY(waitForSignal(queue, SIGNAL(messageReceived()))); Message message = queue->getMessage(); QCOMPARE(message.payload(), QByteArray("first message")); +} - client.disconnectFromHost(); - QVERIFY(waitForSignal(&client, SIGNAL(disconnected()))); +void tst_QAMQPQueue::remove() +{ + Queue *queue = client->createQueue("test-remove"); + queue->declare(); + QVERIFY(waitForSignal(queue, SIGNAL(declared()))); + queue->remove(Queue::roIfEmpty|Queue::roIfUnused); + QVERIFY(waitForSignal(queue, SIGNAL(removed()))); +} + +void tst_QAMQPQueue::removeIfUnused() +{ + Queue *queue = client->createQueue("test-remove-if-unused"); + queue->declare(); + QVERIFY(waitForSignal(queue, SIGNAL(declared()))); + queue->consume(); + + queue->remove(Queue::roIfUnused); + QVERIFY(waitForSignal(queue, SIGNAL(error(ChannelError)))); + QCOMPARE(queue->error(), Channel::PreconditionFailed); +} + +void tst_QAMQPQueue::removeIfEmpty() +{ + // NOTE: this will work once I refactor messages to easily + // add propertis for e.g. persistence + + Queue *queue = client->createQueue(); + queue->declare("test-remove-if-empty", Queue::Durable); + QVERIFY(waitForSignal(queue, SIGNAL(declared()))); + queue->consume(); + + Exchange *defaultExchange = client->createExchange(); + defaultExchange->publish("test-remove-if-empty", "first message"); + QVERIFY(waitForSignal(queue, SIGNAL(messageReceived()))); + + queue->remove(Queue::roIfEmpty); + QVERIFY(waitForSignal(queue, SIGNAL(error(ChannelError)))); + QCOMPARE(queue->error(), Channel::PreconditionFailed); } QTEST_MAIN(tst_QAMQPQueue)