merge _q_body/_q_content from QueuePrivate to Queue

This commit is contained in:
Matt Broadstone 2014-05-30 14:57:09 -04:00
parent e5e3036d03
commit 1c3bd9fdf1
4 changed files with 33 additions and 46 deletions

View File

@ -29,7 +29,7 @@
namespace QAMQP
{
class QueuePrivate;
class Queue;
namespace Frame
{
typedef quint16 channel_t;
@ -358,7 +358,7 @@ namespace Frame
qlonglong bodySize_;
private:
friend class QAMQP::QueuePrivate;
friend class QAMQP::Queue;
};
class QAMQP_EXPORT ContentBody : public Base

View File

@ -130,13 +130,41 @@ void Queue::unbind(Exchange *exchange, const QString &key)
void Queue::_q_content(const Frame::Content &frame)
{
Q_D(Queue);
d->_q_content(frame);
Q_ASSERT(frame.channel() == d->number);
if (frame.channel() != d->number)
return;
if (d->messages_.isEmpty()) {
qErrnoWarning("Received content-header without method frame before");
return;
}
MessagePtr &message = d->messages_.last();
message->leftSize = frame.bodySize();
QHash<int, QVariant>::ConstIterator it;
QHash<int, QVariant>::ConstIterator itEnd = frame.properties_.constEnd();
for (it = frame.properties_.constBegin(); it != itEnd; ++it)
message->property[Message::MessageProperty(it.key())] = it.value();
}
void Queue::_q_body(const Frame::ContentBody &frame)
{
Q_D(Queue);
d->_q_body(frame);
Q_ASSERT(frame.channel() == d->number);
if (frame.channel() != d->number)
return;
if (d->messages_.isEmpty()) {
qErrnoWarning("Received content-body without method frame before");
return;
}
MessagePtr &message = d->messages_.last();
message->payload.append(frame.body());
message->leftSize -= frame.body().size();
if (message->leftSize == 0 && d->messages_.size() == 1)
Q_EMIT messageReceived(this);
}
MessagePtr Queue::getMessage()
@ -493,42 +521,3 @@ void QueuePrivate::deliver(const Frame::Method &frame)
newMessage->deliveryTag = deliveryTag;
messages_.enqueue(newMessage);
}
void QueuePrivate::_q_content(const Frame::Content &frame)
{
Q_ASSERT(frame.channel() == number);
if (frame.channel() != number)
return;
if (messages_.isEmpty()) {
qErrnoWarning("Received content-header without method frame before");
return;
}
MessagePtr &message = messages_.last();
message->leftSize = frame.bodySize();
QHash<int, QVariant>::ConstIterator it;
QHash<int, QVariant>::ConstIterator itEnd = frame.properties_.constEnd();
for (it = frame.properties_.constBegin(); it != itEnd; ++it)
message->property[Message::MessageProperty(it.key())] = it.value();
}
void QueuePrivate::_q_body(const Frame::ContentBody &frame)
{
Q_Q(Queue);
Q_ASSERT(frame.channel() == number);
if (frame.channel() != number)
return;
if (messages_.isEmpty()) {
qErrnoWarning("Received content-body without method frame before");
return;
}
MessagePtr &message = messages_.last();
message->payload.append(frame.body());
message->leftSize -= frame.body().size();
if (message->leftSize == 0 && messages_.size() == 1)
Q_EMIT q->messageReceived(q);
}

View File

@ -57,6 +57,7 @@ public:
void unbind(Exchange *exchange, const QString &key);
MessagePtr getMessage();
void get();
void ack(const MessagePtr &message);
bool hasMessage() const;

View File

@ -60,9 +60,6 @@ public:
bool recievingMessage;
void _q_content(const Frame::Content &frame);
void _q_body(const Frame::ContentBody &frame);
Q_DECLARE_PUBLIC(Queue)
};