diff --git a/src/amqp_queue.cpp b/src/amqp_queue.cpp index a4c563a..48af814 100644 --- a/src/amqp_queue.cpp +++ b/src/amqp_queue.cpp @@ -75,22 +75,20 @@ bool QueuePrivate::_q_method(const Frame::Method &frame) void QueuePrivate::_q_content(const Frame::Content &frame) { - Q_Q(Queue); Q_ASSERT(frame.channel() == channelNumber); if (frame.channel() != channelNumber) return; - if (q->isEmpty()) { - qErrnoWarning("Received content-header without method frame before"); + if (!currentMessage.isValid()) { + qAmqpDebug() << "received content-header without delivered message"; return; } - Message message = q->last(); - message.d->leftSize = frame.bodySize(); + currentMessage.d->leftSize = frame.bodySize(); QHash::ConstIterator it; QHash::ConstIterator itEnd = frame.properties_.constEnd(); for (it = frame.properties_.constBegin(); it != itEnd; ++it) - message.d->properties[MessageProperty(it.key())] = it.value(); + currentMessage.d->properties[MessageProperty(it.key())] = it.value(); } void QueuePrivate::_q_body(const Frame::ContentBody &frame) @@ -100,22 +98,23 @@ void QueuePrivate::_q_body(const Frame::ContentBody &frame) if (frame.channel() != channelNumber) return; - if (q->isEmpty()) { - qErrnoWarning("Received content-body without method frame before"); + if (!currentMessage.isValid()) { + qAmqpDebug() << "received content-body without delivered message"; return; } - Message message = q->last(); - message.d->payload.append(frame.body()); - message.d->leftSize -= frame.body().size(); - if (message.d->leftSize == 0 && q->size() == 1) + currentMessage.d->payload.append(frame.body()); + currentMessage.d->leftSize -= frame.body().size(); + if (currentMessage.d->leftSize == 0) { + q->enqueue(currentMessage); Q_EMIT q->messageReceived(); + } } void QueuePrivate::declareOk(const Frame::Method &frame) { Q_Q(Queue); - qAmqpDebug() << "Declared queue: " << name; + qAmqpDebug() << "declared queue: " << name; declared = true; QByteArray data = frame.arguments(); @@ -124,7 +123,7 @@ void QueuePrivate::declareOk(const Frame::Method &frame) name = Frame::readField('s', stream).toString(); qint32 messageCount = 0, consumerCount = 0; stream >> messageCount >> consumerCount; - qAmqpDebug("Message count %d\nConsumer count: %d", messageCount, consumerCount); + qAmqpDebug("message count %d\nConsumer count: %d", messageCount, consumerCount); Q_EMIT q->declared(); } @@ -172,13 +171,12 @@ void QueuePrivate::unbindOk(const Frame::Method &frame) Q_UNUSED(frame) Q_Q(Queue); - qAmqpDebug() << Q_FUNC_INFO << "unbound exchange"; + qAmqpDebug() << Q_FUNC_INFO << "unbound from exchange"; Q_EMIT q->unbound(); } void QueuePrivate::getOk(const Frame::Method &frame) { - Q_Q(Queue); QByteArray data = frame.arguments(); QDataStream in(&data, QIODevice::ReadOnly); @@ -187,27 +185,27 @@ void QueuePrivate::getOk(const Frame::Method &frame) message.d->redelivered = Frame::readField('t',in).toBool(); message.d->exchangeName = Frame::readField('s',in).toString(); message.d->routingKey = Frame::readField('s',in).toString(); - q->enqueue(message); + currentMessage = message; } void QueuePrivate::consumeOk(const Frame::Method &frame) { - qAmqpDebug() << "Consume ok: " << name; + qAmqpDebug() << "consume ok: " << name; QByteArray data = frame.arguments(); QDataStream stream(&data, QIODevice::ReadOnly); - consumerTag = Frame::readField('s',stream).toString(); - qAmqpDebug("Consumer tag = %s", qPrintable(consumerTag)); + QString consumerTag = Frame::readField('s',stream).toString(); + qAmqpDebug("consumer tag = %s", qPrintable(consumerTag)); + consumerTags.append(consumerTag); } void QueuePrivate::deliver(const Frame::Method &frame) { - Q_Q(Queue); qAmqpDebug() << Q_FUNC_INFO; QByteArray data = frame.arguments(); QDataStream in(&data, QIODevice::ReadOnly); - QString consumer_ = Frame::readField('s',in).toString(); - if (consumer_ != consumerTag) { - qAmqpDebug() << Q_FUNC_INFO << "invalid consumer tag: " << consumer_; + QString consumer = Frame::readField('s',in).toString(); + if (!consumerTags.contains(consumer)) { + qAmqpDebug() << Q_FUNC_INFO << "invalid consumer tag: " << consumer; return; } @@ -216,7 +214,7 @@ void QueuePrivate::deliver(const Frame::Method &frame) message.d->redelivered = Frame::readField('t',in).toBool(); message.d->exchangeName = Frame::readField('s',in).toString(); message.d->routingKey = Frame::readField('s',in).toString(); - q->enqueue(message); + currentMessage = message; } void QueuePrivate::declare() @@ -429,7 +427,7 @@ void Queue::consume(int options) out << qint16(0); //reserved 1 Frame::writeField('s', out, d->name); - Frame::writeField('s', out, d->consumerTag); + Frame::writeField('s', out, d->explicitConsumerTag); out << qint8(options); Frame::writeField('F', out, Frame::TableField()); @@ -441,13 +439,13 @@ void Queue::consume(int options) void Queue::setConsumerTag(const QString &consumerTag) { Q_D(Queue); - d->consumerTag = consumerTag; + d->explicitConsumerTag = consumerTag; } QString Queue::consumerTag() const { Q_D(const Queue); - return d->consumerTag; + return d->explicitConsumerTag; } void Queue::get() diff --git a/src/amqp_queue_p.h b/src/amqp_queue_p.h index 7fc8605..0632f40 100644 --- a/src/amqp_queue_p.h +++ b/src/amqp_queue_p.h @@ -2,6 +2,8 @@ #define amqp_queue_p_h__ #include +#include + #include "amqp_channel_p.h" namespace QAMQP @@ -43,9 +45,11 @@ public: bool delayedDeclare; bool declared; bool noAck; - QString consumerTag; + QString explicitConsumerTag; + QStringList consumerTags; QQueue > delayedBindings; bool recievingMessage; + Message currentMessage; Q_DECLARE_PUBLIC(Queue)