change the way messages are received
Messages used to be "delivered" and pushed right onto the queue, which is where the requirement for Message to be explicitly shared came from. Now messages that are incomplete come in and are stored in a "currentMessage" private member (there is a requirement that messages are delivered in this fashion, by the spec). Once the message body has come in, it is put into the local queue and the messageReceived signal is emitted. Also, modified Queue to be able to track a number of consumer tags. This was exposed by the "routing" tutorial, where a single consumer can bind to a number of exchanges with a generated consumer tag. The previous example did not run into this problem because it used an explicitly set consumer tag and therefore there were no conflicts.
This commit is contained in:
parent
bf21ebc246
commit
1ae43ef4bd
|
|
@ -75,22 +75,20 @@ bool QueuePrivate::_q_method(const Frame::Method &frame)
|
||||||
|
|
||||||
void QueuePrivate::_q_content(const Frame::Content &frame)
|
void QueuePrivate::_q_content(const Frame::Content &frame)
|
||||||
{
|
{
|
||||||
Q_Q(Queue);
|
|
||||||
Q_ASSERT(frame.channel() == channelNumber);
|
Q_ASSERT(frame.channel() == channelNumber);
|
||||||
if (frame.channel() != channelNumber)
|
if (frame.channel() != channelNumber)
|
||||||
return;
|
return;
|
||||||
|
|
||||||
if (q->isEmpty()) {
|
if (!currentMessage.isValid()) {
|
||||||
qErrnoWarning("Received content-header without method frame before");
|
qAmqpDebug() << "received content-header without delivered message";
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
Message message = q->last();
|
currentMessage.d->leftSize = frame.bodySize();
|
||||||
message.d->leftSize = frame.bodySize();
|
|
||||||
QHash<int, QVariant>::ConstIterator it;
|
QHash<int, QVariant>::ConstIterator it;
|
||||||
QHash<int, QVariant>::ConstIterator itEnd = frame.properties_.constEnd();
|
QHash<int, QVariant>::ConstIterator itEnd = frame.properties_.constEnd();
|
||||||
for (it = frame.properties_.constBegin(); it != itEnd; ++it)
|
for (it = frame.properties_.constBegin(); it != itEnd; ++it)
|
||||||
message.d->properties[MessageProperty(it.key())] = it.value();
|
currentMessage.d->properties[MessageProperty(it.key())] = it.value();
|
||||||
}
|
}
|
||||||
|
|
||||||
void QueuePrivate::_q_body(const Frame::ContentBody &frame)
|
void QueuePrivate::_q_body(const Frame::ContentBody &frame)
|
||||||
|
|
@ -100,22 +98,23 @@ void QueuePrivate::_q_body(const Frame::ContentBody &frame)
|
||||||
if (frame.channel() != channelNumber)
|
if (frame.channel() != channelNumber)
|
||||||
return;
|
return;
|
||||||
|
|
||||||
if (q->isEmpty()) {
|
if (!currentMessage.isValid()) {
|
||||||
qErrnoWarning("Received content-body without method frame before");
|
qAmqpDebug() << "received content-body without delivered message";
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
Message message = q->last();
|
currentMessage.d->payload.append(frame.body());
|
||||||
message.d->payload.append(frame.body());
|
currentMessage.d->leftSize -= frame.body().size();
|
||||||
message.d->leftSize -= frame.body().size();
|
if (currentMessage.d->leftSize == 0) {
|
||||||
if (message.d->leftSize == 0 && q->size() == 1)
|
q->enqueue(currentMessage);
|
||||||
Q_EMIT q->messageReceived();
|
Q_EMIT q->messageReceived();
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
void QueuePrivate::declareOk(const Frame::Method &frame)
|
void QueuePrivate::declareOk(const Frame::Method &frame)
|
||||||
{
|
{
|
||||||
Q_Q(Queue);
|
Q_Q(Queue);
|
||||||
qAmqpDebug() << "Declared queue: " << name;
|
qAmqpDebug() << "declared queue: " << name;
|
||||||
declared = true;
|
declared = true;
|
||||||
|
|
||||||
QByteArray data = frame.arguments();
|
QByteArray data = frame.arguments();
|
||||||
|
|
@ -124,7 +123,7 @@ void QueuePrivate::declareOk(const Frame::Method &frame)
|
||||||
name = Frame::readField('s', stream).toString();
|
name = Frame::readField('s', stream).toString();
|
||||||
qint32 messageCount = 0, consumerCount = 0;
|
qint32 messageCount = 0, consumerCount = 0;
|
||||||
stream >> messageCount >> consumerCount;
|
stream >> messageCount >> consumerCount;
|
||||||
qAmqpDebug("Message count %d\nConsumer count: %d", messageCount, consumerCount);
|
qAmqpDebug("message count %d\nConsumer count: %d", messageCount, consumerCount);
|
||||||
|
|
||||||
Q_EMIT q->declared();
|
Q_EMIT q->declared();
|
||||||
}
|
}
|
||||||
|
|
@ -172,13 +171,12 @@ void QueuePrivate::unbindOk(const Frame::Method &frame)
|
||||||
Q_UNUSED(frame)
|
Q_UNUSED(frame)
|
||||||
|
|
||||||
Q_Q(Queue);
|
Q_Q(Queue);
|
||||||
qAmqpDebug() << Q_FUNC_INFO << "unbound exchange";
|
qAmqpDebug() << Q_FUNC_INFO << "unbound from exchange";
|
||||||
Q_EMIT q->unbound();
|
Q_EMIT q->unbound();
|
||||||
}
|
}
|
||||||
|
|
||||||
void QueuePrivate::getOk(const Frame::Method &frame)
|
void QueuePrivate::getOk(const Frame::Method &frame)
|
||||||
{
|
{
|
||||||
Q_Q(Queue);
|
|
||||||
QByteArray data = frame.arguments();
|
QByteArray data = frame.arguments();
|
||||||
QDataStream in(&data, QIODevice::ReadOnly);
|
QDataStream in(&data, QIODevice::ReadOnly);
|
||||||
|
|
||||||
|
|
@ -187,27 +185,27 @@ void QueuePrivate::getOk(const Frame::Method &frame)
|
||||||
message.d->redelivered = Frame::readField('t',in).toBool();
|
message.d->redelivered = Frame::readField('t',in).toBool();
|
||||||
message.d->exchangeName = Frame::readField('s',in).toString();
|
message.d->exchangeName = Frame::readField('s',in).toString();
|
||||||
message.d->routingKey = Frame::readField('s',in).toString();
|
message.d->routingKey = Frame::readField('s',in).toString();
|
||||||
q->enqueue(message);
|
currentMessage = message;
|
||||||
}
|
}
|
||||||
|
|
||||||
void QueuePrivate::consumeOk(const Frame::Method &frame)
|
void QueuePrivate::consumeOk(const Frame::Method &frame)
|
||||||
{
|
{
|
||||||
qAmqpDebug() << "Consume ok: " << name;
|
qAmqpDebug() << "consume ok: " << name;
|
||||||
QByteArray data = frame.arguments();
|
QByteArray data = frame.arguments();
|
||||||
QDataStream stream(&data, QIODevice::ReadOnly);
|
QDataStream stream(&data, QIODevice::ReadOnly);
|
||||||
consumerTag = Frame::readField('s',stream).toString();
|
QString consumerTag = Frame::readField('s',stream).toString();
|
||||||
qAmqpDebug("Consumer tag = %s", qPrintable(consumerTag));
|
qAmqpDebug("consumer tag = %s", qPrintable(consumerTag));
|
||||||
|
consumerTags.append(consumerTag);
|
||||||
}
|
}
|
||||||
|
|
||||||
void QueuePrivate::deliver(const Frame::Method &frame)
|
void QueuePrivate::deliver(const Frame::Method &frame)
|
||||||
{
|
{
|
||||||
Q_Q(Queue);
|
|
||||||
qAmqpDebug() << Q_FUNC_INFO;
|
qAmqpDebug() << Q_FUNC_INFO;
|
||||||
QByteArray data = frame.arguments();
|
QByteArray data = frame.arguments();
|
||||||
QDataStream in(&data, QIODevice::ReadOnly);
|
QDataStream in(&data, QIODevice::ReadOnly);
|
||||||
QString consumer_ = Frame::readField('s',in).toString();
|
QString consumer = Frame::readField('s',in).toString();
|
||||||
if (consumer_ != consumerTag) {
|
if (!consumerTags.contains(consumer)) {
|
||||||
qAmqpDebug() << Q_FUNC_INFO << "invalid consumer tag: " << consumer_;
|
qAmqpDebug() << Q_FUNC_INFO << "invalid consumer tag: " << consumer;
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
@ -216,7 +214,7 @@ void QueuePrivate::deliver(const Frame::Method &frame)
|
||||||
message.d->redelivered = Frame::readField('t',in).toBool();
|
message.d->redelivered = Frame::readField('t',in).toBool();
|
||||||
message.d->exchangeName = Frame::readField('s',in).toString();
|
message.d->exchangeName = Frame::readField('s',in).toString();
|
||||||
message.d->routingKey = Frame::readField('s',in).toString();
|
message.d->routingKey = Frame::readField('s',in).toString();
|
||||||
q->enqueue(message);
|
currentMessage = message;
|
||||||
}
|
}
|
||||||
|
|
||||||
void QueuePrivate::declare()
|
void QueuePrivate::declare()
|
||||||
|
|
@ -429,7 +427,7 @@ void Queue::consume(int options)
|
||||||
|
|
||||||
out << qint16(0); //reserved 1
|
out << qint16(0); //reserved 1
|
||||||
Frame::writeField('s', out, d->name);
|
Frame::writeField('s', out, d->name);
|
||||||
Frame::writeField('s', out, d->consumerTag);
|
Frame::writeField('s', out, d->explicitConsumerTag);
|
||||||
|
|
||||||
out << qint8(options);
|
out << qint8(options);
|
||||||
Frame::writeField('F', out, Frame::TableField());
|
Frame::writeField('F', out, Frame::TableField());
|
||||||
|
|
@ -441,13 +439,13 @@ void Queue::consume(int options)
|
||||||
void Queue::setConsumerTag(const QString &consumerTag)
|
void Queue::setConsumerTag(const QString &consumerTag)
|
||||||
{
|
{
|
||||||
Q_D(Queue);
|
Q_D(Queue);
|
||||||
d->consumerTag = consumerTag;
|
d->explicitConsumerTag = consumerTag;
|
||||||
}
|
}
|
||||||
|
|
||||||
QString Queue::consumerTag() const
|
QString Queue::consumerTag() const
|
||||||
{
|
{
|
||||||
Q_D(const Queue);
|
Q_D(const Queue);
|
||||||
return d->consumerTag;
|
return d->explicitConsumerTag;
|
||||||
}
|
}
|
||||||
|
|
||||||
void Queue::get()
|
void Queue::get()
|
||||||
|
|
|
||||||
|
|
@ -2,6 +2,8 @@
|
||||||
#define amqp_queue_p_h__
|
#define amqp_queue_p_h__
|
||||||
|
|
||||||
#include <QQueue>
|
#include <QQueue>
|
||||||
|
#include <QStringList>
|
||||||
|
|
||||||
#include "amqp_channel_p.h"
|
#include "amqp_channel_p.h"
|
||||||
|
|
||||||
namespace QAMQP
|
namespace QAMQP
|
||||||
|
|
@ -43,9 +45,11 @@ public:
|
||||||
bool delayedDeclare;
|
bool delayedDeclare;
|
||||||
bool declared;
|
bool declared;
|
||||||
bool noAck;
|
bool noAck;
|
||||||
QString consumerTag;
|
QString explicitConsumerTag;
|
||||||
|
QStringList consumerTags;
|
||||||
QQueue<QPair<QString, QString> > delayedBindings;
|
QQueue<QPair<QString, QString> > delayedBindings;
|
||||||
bool recievingMessage;
|
bool recievingMessage;
|
||||||
|
Message currentMessage;
|
||||||
|
|
||||||
Q_DECLARE_PUBLIC(Queue)
|
Q_DECLARE_PUBLIC(Queue)
|
||||||
|
|
||||||
|
|
|
||||||
Loading…
Reference in New Issue