diff --git a/src/amqp_queue.cpp b/src/amqp_queue.cpp index ea932b1..8b7a9f6 100644 --- a/src/amqp_queue.cpp +++ b/src/amqp_queue.cpp @@ -25,7 +25,7 @@ void Queue::onOpen() { Q_D(Queue); if (d->delayedDeclare) - d->declare(); + declare(); if (!d->delayedBindings.isEmpty()) { typedef QPair BindingPair; @@ -37,8 +37,7 @@ void Queue::onOpen() void Queue::onClose() { - Q_D(Queue); - d->remove(true, true); + remove(true, true); } Queue::QueueOptions Queue::option() const @@ -59,24 +58,60 @@ bool Queue::noAck() const return d->noAck; } -void Queue::declare() -{ - Q_D(Queue); - declare(d->name, QueueOptions(Durable | AutoDelete)); -} - void Queue::declare(const QString &name, QueueOptions options) { Q_D(Queue); - setName(name); + if (!name.isEmpty()) + d->name = name; d->options = options; - d->declare(); + + if (!d->opened) { + d->delayedDeclare = true; + return; + } + + Frame::Method frame(Frame::fcQueue, QueuePrivate::miDeclare); + frame.setChannel(d->number); + + QByteArray arguments; + QDataStream out(&arguments, QIODevice::WriteOnly); + + out << qint16(0); //reserver 1 + Frame::writeField('s', out, d->name); + + out << qint8(options); + Frame::writeField('F', out, Frame::TableField()); + + frame.setArguments(arguments); + d->sendFrame(frame); + d->delayedDeclare = false; } void Queue::remove(bool ifUnused, bool ifEmpty, bool noWait) { Q_D(Queue); - d->remove(ifUnused, ifEmpty, noWait); + if (!d->declared) { + qDebug() << Q_FUNC_INFO << "trying to remove undeclared queue, aborting..."; + return; + } + + Frame::Method frame(Frame::fcQueue, QueuePrivate::miDelete); + frame.setChannel(d->number); + + QByteArray arguments; + QDataStream out(&arguments, QIODevice::WriteOnly); + + out << qint16(0); //reserver 1 + Frame::writeField('s', out, d->name); + + qint8 flag = 0; + flag |= (ifUnused ? 0x1 : 0); + flag |= (ifEmpty ? 0x2 : 0); + flag |= (noWait ? 0x4 : 0); + out << flag; + + frame.setArguments(arguments); + d->sendFrame(frame); } void Queue::purge() @@ -177,12 +212,12 @@ void Queue::_q_content(const Frame::Content &frame) if (frame.channel() != d->number) return; - if (d->messages_.isEmpty()) { + if (d->messages.isEmpty()) { qErrnoWarning("Received content-header without method frame before"); return; } - MessagePtr &message = d->messages_.last(); + MessagePtr &message = d->messages.last(); message->leftSize = frame.bodySize(); QHash::ConstIterator it; QHash::ConstIterator itEnd = frame.properties_.constEnd(); @@ -197,32 +232,32 @@ void Queue::_q_body(const Frame::ContentBody &frame) if (frame.channel() != d->number) return; - if (d->messages_.isEmpty()) { + if (d->messages.isEmpty()) { qErrnoWarning("Received content-body without method frame before"); return; } - MessagePtr &message = d->messages_.last(); + MessagePtr &message = d->messages.last(); message->payload.append(frame.body()); message->leftSize -= frame.body().size(); - if (message->leftSize == 0 && d->messages_.size() == 1) + if (message->leftSize == 0 && d->messages.size() == 1) Q_EMIT messageReceived(this); } MessagePtr Queue::getMessage() { Q_D(Queue); - return d->messages_.dequeue(); + return d->messages.dequeue(); } bool Queue::hasMessage() const { Q_D(const Queue); - if (d->messages_.isEmpty()) + if (d->messages.isEmpty()) return false; - const MessagePtr &q = d->messages_.head(); + const MessagePtr &q = d->messages.head(); return q->leftSize == 0; } @@ -424,52 +459,6 @@ void QueuePrivate::unbindOk(const Frame::Method &frame) QMetaObject::invokeMethod(q, "binded", Q_ARG(bool, false)); } -void QueuePrivate::declare() -{ - if (!opened) { - delayedDeclare = true; - return; - } - - Frame::Method frame(Frame::fcQueue, miDeclare); - frame.setChannel(number); - QByteArray arguments_; - QDataStream out(&arguments_, QIODevice::WriteOnly); - out << qint16(0); //reserver 1 - Frame::writeField('s', out, name); - out << qint8(options); - Frame::writeField('F', out, Frame::TableField()); - - frame.setArguments(arguments_); - sendFrame(frame); - delayedDeclare = false; -} - -void QueuePrivate::remove(bool ifUnused, bool ifEmpty, bool noWait) -{ - if (!declared) - return; - - Frame::Method frame(Frame::fcQueue, miDelete); - frame.setChannel(number); - QByteArray arguments_; - QDataStream out(&arguments_, QIODevice::WriteOnly); - - out << qint16(0); //reserver 1 - Frame::writeField('s', out, name); - - qint8 flag = 0; - - flag |= (ifUnused ? 0x1 : 0); - flag |= (ifEmpty ? 0x2 : 0); - flag |= (noWait ? 0x4 : 0); - - out << flag; - - frame.setArguments(arguments_); - sendFrame(frame); -} - void QueuePrivate::getOk(const Frame::Method &frame) { QByteArray data = frame.arguments(); @@ -486,7 +475,7 @@ void QueuePrivate::getOk(const Frame::Method &frame) newMessage->routeKey = routingKey; newMessage->exchangeName = exchangeName; newMessage->deliveryTag = deliveryTag; - messages_.enqueue(newMessage); + messages.enqueue(newMessage); } void QueuePrivate::consumeOk(const Frame::Method &frame) @@ -519,5 +508,5 @@ void QueuePrivate::deliver(const Frame::Method &frame) newMessage->routeKey = routingKey; newMessage->exchangeName = exchangeName; newMessage->deliveryTag = deliveryTag; - messages_.enqueue(newMessage); + messages.enqueue(newMessage); } diff --git a/src/amqp_queue.h b/src/amqp_queue.h index d2cca7e..d0fa3ea 100644 --- a/src/amqp_queue.h +++ b/src/amqp_queue.h @@ -44,8 +44,8 @@ public: ~Queue(); QueueOptions option() const; - void declare(); - void declare(const QString &name, QueueOptions options); + void declare(const QString &name = QString(), + QueueOptions options = QueueOptions(Durable | AutoDelete)); void remove(bool ifUnused = true, bool ifEmpty = true, bool noWait = true); void purge(); diff --git a/src/amqp_queue_p.h b/src/amqp_queue_p.h index e3b6526..7ca9511 100644 --- a/src/amqp_queue_p.h +++ b/src/amqp_queue_p.h @@ -23,9 +23,6 @@ public: QueuePrivate(Queue *q); ~QueuePrivate(); - void declare(); - void remove(bool ifUnused = true, bool ifEmpty = true, bool noWait = true); - // method handler related virtual bool _q_method(const Frame::Method &frame); void declareOk(const Frame::Method &frame); @@ -36,17 +33,14 @@ public: void consumeOk(const Frame::Method &frame); void deliver(const Frame::Method &frame); - QString type; Queue::QueueOptions options; bool delayedDeclare; bool declared; bool noAck; QString consumerTag; - QQueue > delayedBindings; - QQueue messages_; - + QQueue messages; bool recievingMessage; Q_DECLARE_PUBLIC(Queue)