diff --git a/src/amqp_frame.h b/src/amqp_frame.h index 46eb3af..b663e1d 100644 --- a/src/amqp_frame.h +++ b/src/amqp_frame.h @@ -246,7 +246,6 @@ namespace Frame }; - /* * @brief Class for working with content frames. * @detailed Implement main methods for serialize and deserialize raw content frame data. @@ -260,8 +259,8 @@ namespace Frame * * | Property | Description | * | ---------- | ----------- | - * |cpContentType | MIME content type | - * | ocpContentEncoding | MIME content encoding | + * | cpContentType | MIME content type | + * | cpContentEncoding | MIME content encoding | * | cpHeaders | message header field table | * | cpDeliveryMode| nonpersistent (1) or persistent (2) | * | cpPriority | message priority, 0 to 9 | diff --git a/src/amqp_queue.cpp b/src/amqp_queue.cpp index 86469a7..d9f1687 100644 --- a/src/amqp_queue.cpp +++ b/src/amqp_queue.cpp @@ -44,9 +44,7 @@ bool QueuePrivate::_q_method(const Frame::Method &frame) unbindOk(frame); break; case miPurgeOk: - deleteOk(frame); - break; - default: + purgeOk(frame); break; } @@ -67,9 +65,8 @@ bool QueuePrivate::_q_method(const Frame::Method &frame) case bmGetEmpty: Q_EMIT q->empty(); break; - default: - break; } + return true; } @@ -132,10 +129,24 @@ void QueuePrivate::declareOk(const Frame::Method &frame) Q_EMIT q->declared(); } +void QueuePrivate::purgeOk(const Frame::Method &frame) +{ + Q_Q(Queue); + qAmqpDebug() << "purged queue: " << name; + + QByteArray data = frame.arguments(); + QDataStream stream(&data, QIODevice::ReadOnly); + + qint32 messageCount = 0; + stream >> messageCount; + + Q_EMIT q->purged(messageCount); +} + void QueuePrivate::deleteOk(const Frame::Method &frame) { Q_Q(Queue); - qAmqpDebug() << "Deleted or purged queue: " << name; + qAmqpDebug() << "deleted queue: " << name; declared = false; QByteArray data = frame.arguments(); @@ -291,6 +302,11 @@ void Queue::declare(int options) d->declare(); } +void Queue::forceRemove() +{ + remove(0); +} + void Queue::remove(int options) { Q_D(Queue); @@ -420,7 +436,7 @@ void Queue::consume(int options) Frame::writeField('s', out, d->name); Frame::writeField('s', out, d->consumerTag); - out << qint8(options); // no-wait + out << qint8(options); Frame::writeField('F', out, Frame::TableField()); frame.setArguments(arguments); diff --git a/src/amqp_queue.h b/src/amqp_queue.h index f6a0b10..efdff5c 100644 --- a/src/amqp_queue.h +++ b/src/amqp_queue.h @@ -68,6 +68,7 @@ public: void unbind(Exchange *exchange, const QString &key); void purge(); void remove(int options = roIfUnused|roIfEmpty|roNoWait); + void forceRemove(); // AMQP Basic void consume(int options = NoOptions); @@ -81,6 +82,7 @@ Q_SIGNALS: void removed(); void messageReceived(); void empty(); + void purged(int messageCount); protected: // reimp Channel diff --git a/src/amqp_queue_p.h b/src/amqp_queue_p.h index 27f1c54..7fc8605 100644 --- a/src/amqp_queue_p.h +++ b/src/amqp_queue_p.h @@ -31,6 +31,7 @@ public: virtual void _q_body(const Frame::ContentBody &frame); void declareOk(const Frame::Method &frame); void deleteOk(const Frame::Method &frame); + void purgeOk(const Frame::Method &frame); void bindOk(const Frame::Method &frame); void unbindOk(const Frame::Method &frame); void getOk(const Frame::Method &frame);