diff --git a/src/amqp_frame.h b/src/amqp_frame.h index 0f7a4de..833abc9 100644 --- a/src/amqp_frame.h +++ b/src/amqp_frame.h @@ -29,7 +29,7 @@ namespace QAMQP { -class QueuePrivate; +class Queue; namespace Frame { typedef quint16 channel_t; @@ -358,7 +358,7 @@ namespace Frame qlonglong bodySize_; private: - friend class QAMQP::QueuePrivate; + friend class QAMQP::Queue; }; class QAMQP_EXPORT ContentBody : public Base diff --git a/src/amqp_queue.cpp b/src/amqp_queue.cpp index 248b791..bce4cec 100644 --- a/src/amqp_queue.cpp +++ b/src/amqp_queue.cpp @@ -130,13 +130,41 @@ void Queue::unbind(Exchange *exchange, const QString &key) void Queue::_q_content(const Frame::Content &frame) { Q_D(Queue); - d->_q_content(frame); + Q_ASSERT(frame.channel() == d->number); + if (frame.channel() != d->number) + return; + + if (d->messages_.isEmpty()) { + qErrnoWarning("Received content-header without method frame before"); + return; + } + + MessagePtr &message = d->messages_.last(); + message->leftSize = frame.bodySize(); + QHash::ConstIterator it; + QHash::ConstIterator itEnd = frame.properties_.constEnd(); + for (it = frame.properties_.constBegin(); it != itEnd; ++it) + message->property[Message::MessageProperty(it.key())] = it.value(); } void Queue::_q_body(const Frame::ContentBody &frame) { Q_D(Queue); - d->_q_body(frame); + Q_ASSERT(frame.channel() == d->number); + if (frame.channel() != d->number) + return; + + if (d->messages_.isEmpty()) { + qErrnoWarning("Received content-body without method frame before"); + return; + } + + MessagePtr &message = d->messages_.last(); + message->payload.append(frame.body()); + message->leftSize -= frame.body().size(); + + if (message->leftSize == 0 && d->messages_.size() == 1) + Q_EMIT messageReceived(this); } MessagePtr Queue::getMessage() @@ -493,42 +521,3 @@ void QueuePrivate::deliver(const Frame::Method &frame) newMessage->deliveryTag = deliveryTag; messages_.enqueue(newMessage); } - -void QueuePrivate::_q_content(const Frame::Content &frame) -{ - Q_ASSERT(frame.channel() == number); - if (frame.channel() != number) - return; - - if (messages_.isEmpty()) { - qErrnoWarning("Received content-header without method frame before"); - return; - } - - MessagePtr &message = messages_.last(); - message->leftSize = frame.bodySize(); - QHash::ConstIterator it; - QHash::ConstIterator itEnd = frame.properties_.constEnd(); - for (it = frame.properties_.constBegin(); it != itEnd; ++it) - message->property[Message::MessageProperty(it.key())] = it.value(); -} - -void QueuePrivate::_q_body(const Frame::ContentBody &frame) -{ - Q_Q(Queue); - Q_ASSERT(frame.channel() == number); - if (frame.channel() != number) - return; - - if (messages_.isEmpty()) { - qErrnoWarning("Received content-body without method frame before"); - return; - } - - MessagePtr &message = messages_.last(); - message->payload.append(frame.body()); - message->leftSize -= frame.body().size(); - - if (message->leftSize == 0 && messages_.size() == 1) - Q_EMIT q->messageReceived(q); -} diff --git a/src/amqp_queue.h b/src/amqp_queue.h index 942d34a..d2cca7e 100644 --- a/src/amqp_queue.h +++ b/src/amqp_queue.h @@ -57,6 +57,7 @@ public: void unbind(Exchange *exchange, const QString &key); MessagePtr getMessage(); + void get(); void ack(const MessagePtr &message); bool hasMessage() const; diff --git a/src/amqp_queue_p.h b/src/amqp_queue_p.h index 4b38339..54e1fa0 100644 --- a/src/amqp_queue_p.h +++ b/src/amqp_queue_p.h @@ -60,9 +60,6 @@ public: bool recievingMessage; - void _q_content(const Frame::Content &frame); - void _q_body(const Frame::ContentBody &frame); - Q_DECLARE_PUBLIC(Queue) };