refactor Message to be explicitly shared class, rather than passing

shared pointers around directly
This commit is contained in:
Matt Broadstone 2014-06-05 15:34:08 -04:00
parent 73a1d1db36
commit 01d792f38d
11 changed files with 127 additions and 77 deletions

View File

@ -1,15 +1,66 @@
#include "amqp_message.h" #include "amqp_message.h"
#include "amqp_message_p.h"
using namespace QAMQP; using namespace QAMQP;
MessagePrivate::MessagePrivate()
: deliveryTag(0),
leftSize(0)
{
}
//////////////////////////////////////////////////////////////////////////
Message::Message() Message::Message()
: d(new MessagePrivate)
{
}
Message::Message(const Message &other)
: d(other.d)
{ {
qDebug("Message create");
leftSize = 0;
deliveryTag = 0;
} }
Message::~Message() Message::~Message()
{ {
qDebug("Message release"); }
Message &Message::operator=(const Message &other)
{
d = other.d;
return *this;
}
qlonglong Message::deliveryTag() const
{
return d->deliveryTag;
}
bool Message::redelivered() const
{
return d->redelivered;
}
QString Message::exchangeName() const
{
return d->exchangeName;
}
QString Message::routingKey() const
{
return d->routingKey;
}
QByteArray Message::payload() const
{
return d->payload;
}
QHash<Message::MessageProperty, QVariant> Message::properties() const
{
return d->properties;
}
Frame::TableField Message::headers() const
{
return d->headers;
} }

View File

@ -3,7 +3,7 @@
#include <QByteArray> #include <QByteArray>
#include <QHash> #include <QHash>
#include <QSharedPointer> #include <QExplicitlySharedDataPointer>
#include "amqp_frame.h" #include "amqp_frame.h"
#include "amqp_global.h" #include "amqp_global.h"
@ -11,24 +11,32 @@
namespace QAMQP namespace QAMQP
{ {
struct QAMQP_EXPORT Message class MessagePrivate;
class QAMQP_EXPORT Message
{ {
public:
Message(); Message();
virtual ~Message(); Message(const Message &other);
Message &operator=(const Message &other);
~Message();
typedef Frame::Content::Property MessageProperty; typedef Frame::Content::Property MessageProperty;
Q_DECLARE_FLAGS(MessageProperties, MessageProperty) Q_DECLARE_FLAGS(MessageProperties, MessageProperty)
qlonglong deliveryTag; qlonglong deliveryTag() const;
QByteArray payload; bool redelivered() const;
QHash<MessageProperty, QVariant> property; QString exchangeName() const;
Frame::TableField headers; QString routingKey() const;
QString routeKey; QByteArray payload() const;
QString exchangeName; QHash<MessageProperty, QVariant> properties() const;
int leftSize; Frame::TableField headers() const;
};
typedef QSharedPointer<Message> MessagePtr; private:
QExplicitlySharedDataPointer<MessagePrivate> d;
friend class QueuePrivate;
friend class Queue;
};
} // namespace QAMQP } // namespace QAMQP

View File

@ -1,6 +1,7 @@
#include "amqp_queue.h" #include "amqp_queue.h"
#include "amqp_queue_p.h" #include "amqp_queue_p.h"
#include "amqp_exchange.h" #include "amqp_exchange.h"
#include "amqp_message_p.h"
using namespace QAMQP; using namespace QAMQP;
@ -86,12 +87,12 @@ void QueuePrivate::_q_content(const Frame::Content &frame)
return; return;
} }
MessagePtr &message = messages.last(); Message message = messages.last();
message->leftSize = frame.bodySize(); message.d->leftSize = frame.bodySize();
QHash<int, QVariant>::ConstIterator it; QHash<int, QVariant>::ConstIterator it;
QHash<int, QVariant>::ConstIterator itEnd = frame.properties_.constEnd(); QHash<int, QVariant>::ConstIterator itEnd = frame.properties_.constEnd();
for (it = frame.properties_.constBegin(); it != itEnd; ++it) for (it = frame.properties_.constBegin(); it != itEnd; ++it)
message->property[Message::MessageProperty(it.key())] = it.value(); message.d->properties[Message::MessageProperty(it.key())] = it.value();
} }
void QueuePrivate::_q_body(const Frame::ContentBody &frame) void QueuePrivate::_q_body(const Frame::ContentBody &frame)
@ -106,11 +107,10 @@ void QueuePrivate::_q_body(const Frame::ContentBody &frame)
return; return;
} }
MessagePtr &message = messages.last(); Message message = messages.last();
message->payload.append(frame.body()); message.d->payload.append(frame.body());
message->leftSize -= frame.body().size(); message.d->leftSize -= frame.body().size();
if (message.d->leftSize == 0 && messages.size() == 1)
if (message->leftSize == 0 && messages.size() == 1)
Q_EMIT q->messageReceived(q); Q_EMIT q->messageReceived(q);
} }
@ -169,18 +169,12 @@ void QueuePrivate::getOk(const Frame::Method &frame)
QByteArray data = frame.arguments(); QByteArray data = frame.arguments();
QDataStream in(&data, QIODevice::ReadOnly); QDataStream in(&data, QIODevice::ReadOnly);
qlonglong deliveryTag = Frame::readField('L',in).toLongLong(); Message message;
bool redelivered = Frame::readField('t',in).toBool(); message.d->deliveryTag = Frame::readField('L',in).toLongLong();
QString exchangeName = Frame::readField('s',in).toString(); message.d->redelivered = Frame::readField('t',in).toBool();
QString routingKey = Frame::readField('s',in).toString(); message.d->exchangeName = Frame::readField('s',in).toString();
message.d->routingKey = Frame::readField('s',in).toString();
Q_UNUSED(redelivered) messages.enqueue(message);
MessagePtr newMessage = MessagePtr(new Message);
newMessage->routeKey = routingKey;
newMessage->exchangeName = exchangeName;
newMessage->deliveryTag = deliveryTag;
messages.enqueue(newMessage);
} }
void QueuePrivate::consumeOk(const Frame::Method &frame) void QueuePrivate::consumeOk(const Frame::Method &frame)
@ -199,21 +193,17 @@ void QueuePrivate::deliver(const Frame::Method &frame)
QByteArray data = frame.arguments(); QByteArray data = frame.arguments();
QDataStream in(&data, QIODevice::ReadOnly); QDataStream in(&data, QIODevice::ReadOnly);
QString consumer_ = Frame::readField('s',in).toString(); QString consumer_ = Frame::readField('s',in).toString();
if (consumer_ != consumerTag) if (consumer_ != consumerTag) {
qDebug() << Q_FUNC_INFO << "invalid consumer tag: " << consumer_;
return; return;
}
qlonglong deliveryTag = Frame::readField('L',in).toLongLong(); Message message;
bool redelivered = Frame::readField('t',in).toBool(); message.d->deliveryTag = Frame::readField('L',in).toLongLong();
QString exchangeName = Frame::readField('s',in).toString(); message.d->redelivered = Frame::readField('t',in).toBool();
QString routingKey = Frame::readField('s',in).toString(); message.d->exchangeName = Frame::readField('s',in).toString();
message.d->routingKey = Frame::readField('s',in).toString();
Q_UNUSED(redelivered) messages.enqueue(message);
MessagePtr newMessage = MessagePtr(new Message);
newMessage->routeKey = routingKey;
newMessage->exchangeName = exchangeName;
newMessage->deliveryTag = deliveryTag;
messages.enqueue(newMessage);
} }
////////////////////////////////////////////////////////////////////////// //////////////////////////////////////////////////////////////////////////
@ -285,7 +275,7 @@ void Queue::declare(const QString &name, QueueOptions options)
QByteArray arguments; QByteArray arguments;
QDataStream out(&arguments, QIODevice::WriteOnly); QDataStream out(&arguments, QIODevice::WriteOnly);
out << qint16(0); //reserver 1 out << qint16(0); //reserved 1
Frame::writeField('s', out, d->name); Frame::writeField('s', out, d->name);
out << qint8(options); out << qint8(options);
@ -310,7 +300,7 @@ void Queue::remove(bool ifUnused, bool ifEmpty, bool noWait)
QByteArray arguments; QByteArray arguments;
QDataStream out(&arguments, QIODevice::WriteOnly); QDataStream out(&arguments, QIODevice::WriteOnly);
out << qint16(0); //reserver 1 out << qint16(0); //reserved 1
Frame::writeField('s', out, d->name); Frame::writeField('s', out, d->name);
qint8 flag = 0; qint8 flag = 0;
@ -336,7 +326,7 @@ void Queue::purge()
QByteArray arguments; QByteArray arguments;
QDataStream out(&arguments, QIODevice::WriteOnly); QDataStream out(&arguments, QIODevice::WriteOnly);
out << qint16(0); //reserver 1 out << qint16(0); //reserved 1
Frame::writeField('s', out, d->name); Frame::writeField('s', out, d->name);
out << qint8(0); // no-wait out << qint8(0); // no-wait
@ -369,7 +359,7 @@ void Queue::bind(const QString &exchangeName, const QString &key)
QByteArray arguments; QByteArray arguments;
QDataStream out(&arguments, QIODevice::WriteOnly); QDataStream out(&arguments, QIODevice::WriteOnly);
out << qint16(0); //reserver 1 out << qint16(0); //reserved 1
Frame::writeField('s', out, d->name); Frame::writeField('s', out, d->name);
Frame::writeField('s', out, exchangeName); Frame::writeField('s', out, exchangeName);
Frame::writeField('s', out, key); Frame::writeField('s', out, key);
@ -404,7 +394,7 @@ void Queue::unbind(const QString &exchangeName, const QString &key)
QByteArray arguments; QByteArray arguments;
QDataStream out(&arguments, QIODevice::WriteOnly); QDataStream out(&arguments, QIODevice::WriteOnly);
out << qint16(0); //reserver 1 out << qint16(0); //reserved 1
Frame::writeField('s', out, d->name); Frame::writeField('s', out, d->name);
Frame::writeField('s', out, exchangeName); Frame::writeField('s', out, exchangeName);
Frame::writeField('s', out, key); Frame::writeField('s', out, key);
@ -414,7 +404,7 @@ void Queue::unbind(const QString &exchangeName, const QString &key)
d->sendFrame(frame); d->sendFrame(frame);
} }
MessagePtr Queue::getMessage() Message Queue::getMessage()
{ {
Q_D(Queue); Q_D(Queue);
return d->messages.dequeue(); return d->messages.dequeue();
@ -426,8 +416,8 @@ bool Queue::hasMessage() const
if (d->messages.isEmpty()) if (d->messages.isEmpty())
return false; return false;
const MessagePtr &q = d->messages.head(); const Message &message = d->messages.head();
return q->leftSize == 0; return message.d->leftSize == 0;
} }
void Queue::consume(ConsumeOptions options) void Queue::consume(ConsumeOptions options)
@ -444,7 +434,7 @@ void Queue::consume(ConsumeOptions options)
QByteArray arguments; QByteArray arguments;
QDataStream out(&arguments, QIODevice::WriteOnly); QDataStream out(&arguments, QIODevice::WriteOnly);
out << qint16(0); //reserver 1 out << qint16(0); //reserved 1
Frame::writeField('s', out, d->name); Frame::writeField('s', out, d->name);
Frame::writeField('s', out, d->consumerTag); Frame::writeField('s', out, d->consumerTag);
@ -481,16 +471,16 @@ void Queue::get()
QByteArray arguments; QByteArray arguments;
QDataStream out(&arguments, QIODevice::WriteOnly); QDataStream out(&arguments, QIODevice::WriteOnly);
out << qint16(0); //reserver 1 out << qint16(0); //reserved 1
Frame::writeField('s', out, d->name); Frame::writeField('s', out, d->name);
out << qint8(d->noAck ? 1 : 0); // noAck out << qint8(d->noAck ? 1 : 0); // noAck
frame.setArguments(arguments); frame.setArguments(arguments);
d->sendFrame(frame); d->sendFrame(frame);
} }
void Queue::ack(const MessagePtr &message) void Queue::ack(const Message &message)
{ {
Q_D(Queue); Q_D(Queue);
if (!d->opened) { if (!d->opened) {
@ -504,8 +494,8 @@ void Queue::ack(const MessagePtr &message)
QByteArray arguments; QByteArray arguments;
QDataStream out(&arguments, QIODevice::WriteOnly); QDataStream out(&arguments, QIODevice::WriteOnly);
out << message->deliveryTag; //reserver 1 out << message.deliveryTag(); // reserved 1
out << qint8(0); // noAck out << qint8(0); // noAck
frame.setArguments(arguments); frame.setArguments(arguments);
d->sendFrame(frame); d->sendFrame(frame);

View File

@ -43,7 +43,7 @@ public:
QueueOptions option() const; QueueOptions option() const;
bool hasMessage() const; bool hasMessage() const;
MessagePtr getMessage(); Message getMessage();
void setConsumerTag(const QString &consumerTag); void setConsumerTag(const QString &consumerTag);
QString consumerTag() const; QString consumerTag() const;
@ -64,7 +64,7 @@ public:
// AMQP Basic // AMQP Basic
void consume(ConsumeOptions options = ConsumeOptions(NoOptions)); void consume(ConsumeOptions options = ConsumeOptions(NoOptions));
void get(); void get();
void ack(const MessagePtr &message); void ack(const Message &message);
Q_SIGNALS: Q_SIGNALS:
void declared(); void declared();

View File

@ -42,7 +42,7 @@ public:
bool noAck; bool noAck;
QString consumerTag; QString consumerTag;
QQueue<QPair<QString, QString> > delayedBindings; QQueue<QPair<QString, QString> > delayedBindings;
QQueue<MessagePtr> messages; QQueue<Message> messages;
bool recievingMessage; bool recievingMessage;
Q_DECLARE_PUBLIC(Queue) Q_DECLARE_PUBLIC(Queue)

View File

@ -14,6 +14,7 @@ PRIVATE_HEADERS += \
amqp_channel_p.h \ amqp_channel_p.h \
amqp_client_p.h \ amqp_client_p.h \
amqp_exchange_p.h \ amqp_exchange_p.h \
amqp_message_p.h \
amqp_queue_p.h amqp_queue_p.h
INSTALL_HEADERS += \ INSTALL_HEADERS += \

View File

@ -28,8 +28,8 @@ void tst_QAMQPQueue::defaultExchange()
Exchange *defaultExchange = client.createExchange(); Exchange *defaultExchange = client.createExchange();
defaultExchange->publish("test-default-exchange", "first message"); defaultExchange->publish("test-default-exchange", "first message");
QVERIFY(waitForSignal(queue, SIGNAL(messageReceived(Queue*)))); QVERIFY(waitForSignal(queue, SIGNAL(messageReceived(Queue*))));
MessagePtr message = queue->getMessage(); Message message = queue->getMessage();
QCOMPARE(message->payload, QByteArray("first message")); QCOMPARE(message.payload(), QByteArray("first message"));
client.disconnectFromHost(); client.disconnectFromHost();
QVERIFY(waitForSignal(&client, SIGNAL(disconnected()))); QVERIFY(waitForSignal(&client, SIGNAL(disconnected())));

View File

@ -53,8 +53,8 @@ protected slots:
void newMessage() void newMessage()
{ {
// Retrieve message // Retrieve message
QAMQP::MessagePtr message = queue_->getMessage(); QAMQP::Message message = queue_->getMessage();
qDebug() << "ReceiveLog::newMessage " << message->payload; qDebug() << "ReceiveLog::newMessage " << message.payload();
} }
private: private:

View File

@ -60,8 +60,8 @@ protected slots:
void newMessage() void newMessage()
{ {
// Retrieve message // Retrieve message
QAMQP::MessagePtr message = queue_->getMessage(); QAMQP::Message message = queue_->getMessage();
qDebug() << "ReceiveLogDirect::newMessage " << message->payload; qDebug() << "ReceiveLogDirect::newMessage " << message.payload();
} }
private: private:

View File

@ -52,8 +52,8 @@ protected slots:
{ {
while (queue_->hasMessage()) while (queue_->hasMessage())
{ {
QAMQP::MessagePtr message = queue_->getMessage(); QAMQP::Message message = queue_->getMessage();
qDebug() << "Receive::newMessage " << message->payload; qDebug() << "Receive::newMessage " << message.payload();
if(!queue_->noAck()) if(!queue_->noAck())
{ {
queue_->ack(message); queue_->ack(message);

View File

@ -49,11 +49,11 @@ protected slots:
void newMessage() void newMessage()
{ {
// Retrieve message // Retrieve message
QAMQP::MessagePtr message = queue_->getMessage(); QAMQP::Message message = queue_->getMessage();
qDebug() << "Worker::newMessage " << message->payload; qDebug() << "Worker::newMessage " << message.payload();
// Simulate long processing // Simulate long processing
int wait = message->payload.count('.'); int wait = message.payload().count('.');
QTime dieTime = QTime::currentTime().addMSecs(400 * wait); QTime dieTime = QTime::currentTime().addMSecs(400 * wait);
while( QTime::currentTime() < dieTime ); while( QTime::currentTime() < dieTime );