convert QAMQP::Queue to a proper QQueue subclass, move MessageProperties to QAMQP namespace

This commit is contained in:
Matt Broadstone 2014-06-10 09:05:42 -04:00
parent 55da181788
commit 4422924219
12 changed files with 28 additions and 45 deletions

View File

@ -196,7 +196,7 @@ void Exchange::publish(const QString &key, const QByteArray &message,
void Exchange::publish(const QString &key, const QByteArray &message, void Exchange::publish(const QString &key, const QByteArray &message,
const QString &mimeType, const QVariantHash &headers, const QString &mimeType, const QVariantHash &headers,
const Exchange::MessageProperties &properties) const MessageProperties &properties)
{ {
Q_D(Exchange); Q_D(Exchange);
Frame::Method frame(Frame::fcBasic, ExchangePrivate::bmPublish); Frame::Method frame(Frame::fcBasic, ExchangePrivate::bmPublish);
@ -220,8 +220,8 @@ void Exchange::publish(const QString &key, const QByteArray &message,
content.setProperty(Frame::Content::cpHeaders, headers); content.setProperty(Frame::Content::cpHeaders, headers);
content.setProperty(Frame::Content::cpMessageId, "0"); content.setProperty(Frame::Content::cpMessageId, "0");
Exchange::MessageProperties::ConstIterator it; MessageProperties::ConstIterator it;
Exchange::MessageProperties::ConstIterator itEnd = properties.constEnd(); MessageProperties::ConstIterator itEnd = properties.constEnd();
for (it = properties.constBegin(); it != itEnd; ++it) for (it = properties.constBegin(); it != itEnd; ++it)
content.setProperty(it.key(), it.value()); content.setProperty(it.key(), it.value());
content.setBody(message); content.setBody(message);

View File

@ -36,7 +36,6 @@ public:
}; };
Q_DECLARE_FLAGS(ExchangeOptions, ExchangeOption) Q_DECLARE_FLAGS(ExchangeOptions, ExchangeOption)
typedef QHash<Frame::Content::Property, QVariant> MessageProperties;
ExchangeOptions option() const; ExchangeOptions option() const;
virtual ~Exchange(); virtual ~Exchange();
@ -57,7 +56,7 @@ public:
const QString &mimeType, const MessageProperties &properties = MessageProperties()); const QString &mimeType, const MessageProperties &properties = MessageProperties());
void publish(const QString &key, const QByteArray &message, void publish(const QString &key, const QByteArray &message,
const QString &mimeType, const QVariantHash &headers, const QString &mimeType, const QVariantHash &headers,
const Exchange::MessageProperties &properties = Exchange::MessageProperties()); const MessageProperties &properties = MessageProperties());
Q_SIGNALS: Q_SIGNALS:
void declared(); void declared();

View File

@ -418,6 +418,8 @@ namespace Frame
} // namespace Frame } // namespace Frame
typedef QHash<Frame::Content::Property, QVariant> MessageProperties;
} // namespace QAMQP } // namespace QAMQP
Q_DECLARE_METATYPE(QAMQP::Frame::decimal) Q_DECLARE_METATYPE(QAMQP::Frame::decimal)

View File

@ -78,16 +78,17 @@ bool QueuePrivate::_q_method(const Frame::Method &frame)
void QueuePrivate::_q_content(const Frame::Content &frame) void QueuePrivate::_q_content(const Frame::Content &frame)
{ {
Q_Q(Queue);
Q_ASSERT(frame.channel() == channelNumber); Q_ASSERT(frame.channel() == channelNumber);
if (frame.channel() != channelNumber) if (frame.channel() != channelNumber)
return; return;
if (messages.isEmpty()) { if (q->isEmpty()) {
qErrnoWarning("Received content-header without method frame before"); qErrnoWarning("Received content-header without method frame before");
return; return;
} }
Message message = messages.last(); Message message = q->last();
message.d->leftSize = frame.bodySize(); message.d->leftSize = frame.bodySize();
QHash<int, QVariant>::ConstIterator it; QHash<int, QVariant>::ConstIterator it;
QHash<int, QVariant>::ConstIterator itEnd = frame.properties_.constEnd(); QHash<int, QVariant>::ConstIterator itEnd = frame.properties_.constEnd();
@ -102,15 +103,15 @@ void QueuePrivate::_q_body(const Frame::ContentBody &frame)
if (frame.channel() != channelNumber) if (frame.channel() != channelNumber)
return; return;
if (messages.isEmpty()) { if (q->isEmpty()) {
qErrnoWarning("Received content-body without method frame before"); qErrnoWarning("Received content-body without method frame before");
return; return;
} }
Message message = messages.last(); Message message = q->last();
message.d->payload.append(frame.body()); message.d->payload.append(frame.body());
message.d->leftSize -= frame.body().size(); message.d->leftSize -= frame.body().size();
if (message.d->leftSize == 0 && messages.size() == 1) if (message.d->leftSize == 0 && q->size() == 1)
Q_EMIT q->messageReceived(); Q_EMIT q->messageReceived();
} }
@ -166,6 +167,7 @@ void QueuePrivate::unbindOk(const Frame::Method &frame)
void QueuePrivate::getOk(const Frame::Method &frame) void QueuePrivate::getOk(const Frame::Method &frame)
{ {
Q_Q(Queue);
QByteArray data = frame.arguments(); QByteArray data = frame.arguments();
QDataStream in(&data, QIODevice::ReadOnly); QDataStream in(&data, QIODevice::ReadOnly);
@ -174,7 +176,7 @@ void QueuePrivate::getOk(const Frame::Method &frame)
message.d->redelivered = Frame::readField('t',in).toBool(); message.d->redelivered = Frame::readField('t',in).toBool();
message.d->exchangeName = Frame::readField('s',in).toString(); message.d->exchangeName = Frame::readField('s',in).toString();
message.d->routingKey = Frame::readField('s',in).toString(); message.d->routingKey = Frame::readField('s',in).toString();
messages.enqueue(message); q->enqueue(message);
} }
void QueuePrivate::consumeOk(const Frame::Method &frame) void QueuePrivate::consumeOk(const Frame::Method &frame)
@ -188,6 +190,7 @@ void QueuePrivate::consumeOk(const Frame::Method &frame)
void QueuePrivate::deliver(const Frame::Method &frame) void QueuePrivate::deliver(const Frame::Method &frame)
{ {
Q_Q(Queue);
qAmqpDebug() << Q_FUNC_INFO; qAmqpDebug() << Q_FUNC_INFO;
QByteArray data = frame.arguments(); QByteArray data = frame.arguments();
QDataStream in(&data, QIODevice::ReadOnly); QDataStream in(&data, QIODevice::ReadOnly);
@ -202,7 +205,7 @@ void QueuePrivate::deliver(const Frame::Method &frame)
message.d->redelivered = Frame::readField('t',in).toBool(); message.d->redelivered = Frame::readField('t',in).toBool();
message.d->exchangeName = Frame::readField('s',in).toString(); message.d->exchangeName = Frame::readField('s',in).toString();
message.d->routingKey = Frame::readField('s',in).toString(); message.d->routingKey = Frame::readField('s',in).toString();
messages.enqueue(message); q->enqueue(message);
} }
void QueuePrivate::declare() void QueuePrivate::declare()
@ -399,22 +402,6 @@ void Queue::unbind(const QString &exchangeName, const QString &key)
d->sendFrame(frame); d->sendFrame(frame);
} }
Message Queue::getMessage()
{
Q_D(Queue);
return d->messages.dequeue();
}
bool Queue::hasMessage() const
{
Q_D(const Queue);
if (d->messages.isEmpty())
return false;
const Message &message = d->messages.head();
return message.d->leftSize == 0;
}
void Queue::consume(int options) void Queue::consume(int options)
{ {
Q_D(Queue); Q_D(Queue);

View File

@ -1,6 +1,8 @@
#ifndef amqp_queue_h__ #ifndef amqp_queue_h__
#define amqp_queue_h__ #define amqp_queue_h__
#include <QQueue>
#include "amqp_channel.h" #include "amqp_channel.h"
#include "amqp_message.h" #include "amqp_message.h"
#include "amqp_global.h" #include "amqp_global.h"
@ -12,7 +14,7 @@ class Client;
class ClientPrivate; class ClientPrivate;
class Exchange; class Exchange;
class QueuePrivate; class QueuePrivate;
class QAMQP_EXPORT Queue : public Channel class QAMQP_EXPORT Queue : public Channel, public QQueue<Message>
{ {
Q_OBJECT Q_OBJECT
Q_ENUMS(QueueOptions) Q_ENUMS(QueueOptions)
@ -52,9 +54,6 @@ public:
~Queue(); ~Queue();
bool hasMessage() const;
Message getMessage();
void setConsumerTag(const QString &consumerTag); void setConsumerTag(const QString &consumerTag);
QString consumerTag() const; QString consumerTag() const;

View File

@ -44,7 +44,6 @@ public:
bool noAck; bool noAck;
QString consumerTag; QString consumerTag;
QQueue<QPair<QString, QString> > delayedBindings; QQueue<QPair<QString, QString> > delayedBindings;
QQueue<Message> messages;
bool recievingMessage; bool recievingMessage;
Q_DECLARE_PUBLIC(Queue) Q_DECLARE_PUBLIC(Queue)

View File

@ -60,7 +60,7 @@ void tst_QAMQPQueue::defaultExchange()
Exchange *defaultExchange = client->createExchange(); Exchange *defaultExchange = client->createExchange();
defaultExchange->publish("test-default-exchange", "first message"); defaultExchange->publish("test-default-exchange", "first message");
QVERIFY(waitForSignal(queue, SIGNAL(messageReceived()))); QVERIFY(waitForSignal(queue, SIGNAL(messageReceived())));
Message message = queue->getMessage(); Message message = queue->dequeue();
QCOMPARE(message.payload(), QByteArray("first message")); QCOMPARE(message.payload(), QByteArray("first message"));
} }
@ -93,7 +93,7 @@ void tst_QAMQPQueue::standardExchanges()
Exchange *defaultExchange = client->createExchange(exchange); Exchange *defaultExchange = client->createExchange(exchange);
defaultExchange->publish(routingKey, "test message"); defaultExchange->publish(routingKey, "test message");
QVERIFY(waitForSignal(queue, SIGNAL(messageReceived()))); QVERIFY(waitForSignal(queue, SIGNAL(messageReceived())));
QCOMPARE(queue->getMessage().payload(), QByteArray("test message")); QCOMPARE(queue->dequeue().payload(), QByteArray("test message"));
} }
void tst_QAMQPQueue::unnamed() void tst_QAMQPQueue::unnamed()

View File

@ -53,7 +53,7 @@ protected slots:
void newMessage() void newMessage()
{ {
// Retrieve message // Retrieve message
QAMQP::Message message = queue_->getMessage(); QAMQP::Message message = queue_->dequeue();
qDebug() << "ReceiveLog::newMessage " << message.payload(); qDebug() << "ReceiveLog::newMessage " << message.payload();
} }

View File

@ -60,7 +60,7 @@ protected slots:
void newMessage() void newMessage()
{ {
// Retrieve message // Retrieve message
QAMQP::Message message = queue_->getMessage(); QAMQP::Message message = queue_->dequeue();
qDebug() << "ReceiveLogDirect::newMessage " << message.payload(); qDebug() << "ReceiveLogDirect::newMessage " << message.payload();
} }

View File

@ -50,14 +50,11 @@ protected slots:
void newMessage() void newMessage()
{ {
while (queue_->hasMessage()) while (!queue_->isEmpty()) {
{ QAMQP::Message message = queue_->dequeue();
QAMQP::Message message = queue_->getMessage();
qDebug() << "Receive::newMessage " << message.payload(); qDebug() << "Receive::newMessage " << message.payload();
if(!queue_->noAck()) if (!queue_->noAck())
{
queue_->ack(message); queue_->ack(message);
}
} }
} }

View File

@ -52,7 +52,7 @@ protected slots:
{ {
static quint64 counter = 0; static quint64 counter = 0;
QAMQP::Exchange::MessageProperties properties; QAMQP::MessageProperties properties;
properties[QAMQP::Frame::Content::cpDeliveryMode] = 2; // Make message persistent properties[QAMQP::Frame::Content::cpDeliveryMode] = 2; // Make message persistent
QString message(QString("[%1: %2] Hello World! %3") QString message(QString("[%1: %2] Hello World! %3")

View File

@ -49,7 +49,7 @@ protected slots:
void newMessage() void newMessage()
{ {
// Retrieve message // Retrieve message
QAMQP::Message message = queue_->getMessage(); QAMQP::Message message = queue_->dequeue();
qDebug() << "Worker::newMessage " << message.payload(); qDebug() << "Worker::newMessage " << message.payload();
// Simulate long processing // Simulate long processing