moved declare/remove from QueuePrivate to Queue

This commit is contained in:
Matt Broadstone 2014-05-30 15:36:11 -04:00
parent d8da64b373
commit 93287a4836
3 changed files with 60 additions and 77 deletions

View File

@ -25,7 +25,7 @@ void Queue::onOpen()
{ {
Q_D(Queue); Q_D(Queue);
if (d->delayedDeclare) if (d->delayedDeclare)
d->declare(); declare();
if (!d->delayedBindings.isEmpty()) { if (!d->delayedBindings.isEmpty()) {
typedef QPair<QString, QString> BindingPair; typedef QPair<QString, QString> BindingPair;
@ -37,8 +37,7 @@ void Queue::onOpen()
void Queue::onClose() void Queue::onClose()
{ {
Q_D(Queue); remove(true, true);
d->remove(true, true);
} }
Queue::QueueOptions Queue::option() const Queue::QueueOptions Queue::option() const
@ -59,24 +58,60 @@ bool Queue::noAck() const
return d->noAck; return d->noAck;
} }
void Queue::declare()
{
Q_D(Queue);
declare(d->name, QueueOptions(Durable | AutoDelete));
}
void Queue::declare(const QString &name, QueueOptions options) void Queue::declare(const QString &name, QueueOptions options)
{ {
Q_D(Queue); Q_D(Queue);
setName(name); if (!name.isEmpty())
d->name = name;
d->options = options; d->options = options;
d->declare();
if (!d->opened) {
d->delayedDeclare = true;
return;
}
Frame::Method frame(Frame::fcQueue, QueuePrivate::miDeclare);
frame.setChannel(d->number);
QByteArray arguments;
QDataStream out(&arguments, QIODevice::WriteOnly);
out << qint16(0); //reserver 1
Frame::writeField('s', out, d->name);
out << qint8(options);
Frame::writeField('F', out, Frame::TableField());
frame.setArguments(arguments);
d->sendFrame(frame);
d->delayedDeclare = false;
} }
void Queue::remove(bool ifUnused, bool ifEmpty, bool noWait) void Queue::remove(bool ifUnused, bool ifEmpty, bool noWait)
{ {
Q_D(Queue); Q_D(Queue);
d->remove(ifUnused, ifEmpty, noWait); if (!d->declared) {
qDebug() << Q_FUNC_INFO << "trying to remove undeclared queue, aborting...";
return;
}
Frame::Method frame(Frame::fcQueue, QueuePrivate::miDelete);
frame.setChannel(d->number);
QByteArray arguments;
QDataStream out(&arguments, QIODevice::WriteOnly);
out << qint16(0); //reserver 1
Frame::writeField('s', out, d->name);
qint8 flag = 0;
flag |= (ifUnused ? 0x1 : 0);
flag |= (ifEmpty ? 0x2 : 0);
flag |= (noWait ? 0x4 : 0);
out << flag;
frame.setArguments(arguments);
d->sendFrame(frame);
} }
void Queue::purge() void Queue::purge()
@ -177,12 +212,12 @@ void Queue::_q_content(const Frame::Content &frame)
if (frame.channel() != d->number) if (frame.channel() != d->number)
return; return;
if (d->messages_.isEmpty()) { if (d->messages.isEmpty()) {
qErrnoWarning("Received content-header without method frame before"); qErrnoWarning("Received content-header without method frame before");
return; return;
} }
MessagePtr &message = d->messages_.last(); MessagePtr &message = d->messages.last();
message->leftSize = frame.bodySize(); message->leftSize = frame.bodySize();
QHash<int, QVariant>::ConstIterator it; QHash<int, QVariant>::ConstIterator it;
QHash<int, QVariant>::ConstIterator itEnd = frame.properties_.constEnd(); QHash<int, QVariant>::ConstIterator itEnd = frame.properties_.constEnd();
@ -197,32 +232,32 @@ void Queue::_q_body(const Frame::ContentBody &frame)
if (frame.channel() != d->number) if (frame.channel() != d->number)
return; return;
if (d->messages_.isEmpty()) { if (d->messages.isEmpty()) {
qErrnoWarning("Received content-body without method frame before"); qErrnoWarning("Received content-body without method frame before");
return; return;
} }
MessagePtr &message = d->messages_.last(); MessagePtr &message = d->messages.last();
message->payload.append(frame.body()); message->payload.append(frame.body());
message->leftSize -= frame.body().size(); message->leftSize -= frame.body().size();
if (message->leftSize == 0 && d->messages_.size() == 1) if (message->leftSize == 0 && d->messages.size() == 1)
Q_EMIT messageReceived(this); Q_EMIT messageReceived(this);
} }
MessagePtr Queue::getMessage() MessagePtr Queue::getMessage()
{ {
Q_D(Queue); Q_D(Queue);
return d->messages_.dequeue(); return d->messages.dequeue();
} }
bool Queue::hasMessage() const bool Queue::hasMessage() const
{ {
Q_D(const Queue); Q_D(const Queue);
if (d->messages_.isEmpty()) if (d->messages.isEmpty())
return false; return false;
const MessagePtr &q = d->messages_.head(); const MessagePtr &q = d->messages.head();
return q->leftSize == 0; return q->leftSize == 0;
} }
@ -424,52 +459,6 @@ void QueuePrivate::unbindOk(const Frame::Method &frame)
QMetaObject::invokeMethod(q, "binded", Q_ARG(bool, false)); QMetaObject::invokeMethod(q, "binded", Q_ARG(bool, false));
} }
void QueuePrivate::declare()
{
if (!opened) {
delayedDeclare = true;
return;
}
Frame::Method frame(Frame::fcQueue, miDeclare);
frame.setChannel(number);
QByteArray arguments_;
QDataStream out(&arguments_, QIODevice::WriteOnly);
out << qint16(0); //reserver 1
Frame::writeField('s', out, name);
out << qint8(options);
Frame::writeField('F', out, Frame::TableField());
frame.setArguments(arguments_);
sendFrame(frame);
delayedDeclare = false;
}
void QueuePrivate::remove(bool ifUnused, bool ifEmpty, bool noWait)
{
if (!declared)
return;
Frame::Method frame(Frame::fcQueue, miDelete);
frame.setChannel(number);
QByteArray arguments_;
QDataStream out(&arguments_, QIODevice::WriteOnly);
out << qint16(0); //reserver 1
Frame::writeField('s', out, name);
qint8 flag = 0;
flag |= (ifUnused ? 0x1 : 0);
flag |= (ifEmpty ? 0x2 : 0);
flag |= (noWait ? 0x4 : 0);
out << flag;
frame.setArguments(arguments_);
sendFrame(frame);
}
void QueuePrivate::getOk(const Frame::Method &frame) void QueuePrivate::getOk(const Frame::Method &frame)
{ {
QByteArray data = frame.arguments(); QByteArray data = frame.arguments();
@ -486,7 +475,7 @@ void QueuePrivate::getOk(const Frame::Method &frame)
newMessage->routeKey = routingKey; newMessage->routeKey = routingKey;
newMessage->exchangeName = exchangeName; newMessage->exchangeName = exchangeName;
newMessage->deliveryTag = deliveryTag; newMessage->deliveryTag = deliveryTag;
messages_.enqueue(newMessage); messages.enqueue(newMessage);
} }
void QueuePrivate::consumeOk(const Frame::Method &frame) void QueuePrivate::consumeOk(const Frame::Method &frame)
@ -519,5 +508,5 @@ void QueuePrivate::deliver(const Frame::Method &frame)
newMessage->routeKey = routingKey; newMessage->routeKey = routingKey;
newMessage->exchangeName = exchangeName; newMessage->exchangeName = exchangeName;
newMessage->deliveryTag = deliveryTag; newMessage->deliveryTag = deliveryTag;
messages_.enqueue(newMessage); messages.enqueue(newMessage);
} }

View File

@ -44,8 +44,8 @@ public:
~Queue(); ~Queue();
QueueOptions option() const; QueueOptions option() const;
void declare(); void declare(const QString &name = QString(),
void declare(const QString &name, QueueOptions options); QueueOptions options = QueueOptions(Durable | AutoDelete));
void remove(bool ifUnused = true, bool ifEmpty = true, bool noWait = true); void remove(bool ifUnused = true, bool ifEmpty = true, bool noWait = true);
void purge(); void purge();

View File

@ -23,9 +23,6 @@ public:
QueuePrivate(Queue *q); QueuePrivate(Queue *q);
~QueuePrivate(); ~QueuePrivate();
void declare();
void remove(bool ifUnused = true, bool ifEmpty = true, bool noWait = true);
// method handler related // method handler related
virtual bool _q_method(const Frame::Method &frame); virtual bool _q_method(const Frame::Method &frame);
void declareOk(const Frame::Method &frame); void declareOk(const Frame::Method &frame);
@ -36,17 +33,14 @@ public:
void consumeOk(const Frame::Method &frame); void consumeOk(const Frame::Method &frame);
void deliver(const Frame::Method &frame); void deliver(const Frame::Method &frame);
QString type; QString type;
Queue::QueueOptions options; Queue::QueueOptions options;
bool delayedDeclare; bool delayedDeclare;
bool declared; bool declared;
bool noAck; bool noAck;
QString consumerTag; QString consumerTag;
QQueue<QPair<QString, QString> > delayedBindings; QQueue<QPair<QString, QString> > delayedBindings;
QQueue<MessagePtr> messages_; QQueue<MessagePtr> messages;
bool recievingMessage; bool recievingMessage;
Q_DECLARE_PUBLIC(Queue) Q_DECLARE_PUBLIC(Queue)