fix delayed declarations for different exchange types

This commit is contained in:
Matt Broadstone 2014-06-03 16:33:43 -04:00
parent d4b1824b46
commit 983eb60f76
2 changed files with 106 additions and 98 deletions

View File

@ -8,6 +8,108 @@
using namespace QAMQP;
QString ExchangePrivate::typeToString(Exchange::ExchangeType type)
{
switch (type) {
case Exchange::Direct: return QLatin1String("direct");
case Exchange::FanOut: return QLatin1String("fanout");
case Exchange::Topic: return QLatin1String("topic");
case Exchange::Headers: return QLatin1String("headers");
}
return QLatin1String("direct");
}
ExchangePrivate::ExchangePrivate(Exchange *q)
: ChannelPrivate(q),
delayedDeclare(false),
declared(false)
{
}
void ExchangePrivate::declare()
{
if (!opened) {
delayedDeclare = true;
return;
}
if (name.isEmpty()) {
qDebug() << Q_FUNC_INFO << "attempting to declare an unnamed exchange, aborting...";
return;
}
Frame::Method frame(Frame::fcExchange, ExchangePrivate::miDeclare);
frame.setChannel(number);
QByteArray args;
QDataStream stream(&args, QIODevice::WriteOnly);
stream << qint16(0); //reserved 1
Frame::writeField('s', stream, name);
Frame::writeField('s', stream, type);
stream << qint8(options);
Frame::writeField('F', stream, arguments);
frame.setArguments(args);
sendFrame(frame);
delayedDeclare = false;
}
bool ExchangePrivate::_q_method(const Frame::Method &frame)
{
if (ChannelPrivate::_q_method(frame))
return true;
if (frame.methodClass() != Frame::fcExchange)
return false;
switch(frame.id()) {
case miDeclareOk:
declareOk(frame);
break;
case miDeleteOk:
deleteOk(frame);
break;
default:
break;
}
return true;
}
void ExchangePrivate::declareOk(const Frame::Method &frame)
{
Q_UNUSED(frame)
Q_Q(Exchange);
qDebug() << "Declared exchange: " << name;
declared = true;
Q_EMIT q->declared();
}
void ExchangePrivate::deleteOk(const Frame::Method &frame)
{
Q_UNUSED(frame)
Q_Q(Exchange);
qDebug() << "Deleted exchange: " << name;
declared = false;
Q_EMIT q->removed();
}
void ExchangePrivate::_q_disconnected()
{
ChannelPrivate::_q_disconnected();
qDebug() << "Exchange " << name << " disconnected";
delayedDeclare = false;
declared = false;
}
//////////////////////////////////////////////////////////////////////////
Exchange::Exchange(int channelNumber, Client *parent)
: Channel(new ExchangePrivate(this), parent)
{
@ -24,7 +126,7 @@ void Exchange::channelOpened()
{
Q_D(Exchange);
if (d->delayedDeclare)
declare(Exchange::Direct);
d->declare();
}
void Exchange::channelClosed()
@ -55,33 +157,7 @@ void Exchange::declare(const QString &type, ExchangeOptions options , const Fram
d->type = type;
d->options = options;
d->arguments = args;
if (!d->opened) {
d->delayedDeclare = true;
return;
}
if (d->name.isEmpty()) {
qDebug() << Q_FUNC_INFO << "attempting to declare an unnamed exchange, aborting...";
return;
}
Frame::Method frame(Frame::fcExchange, ExchangePrivate::miDeclare);
frame.setChannel(d->number);
QByteArray arguments;
QDataStream stream(&arguments, QIODevice::WriteOnly);
stream << qint16(0); //reserver 1
Frame::writeField('s', stream, d->name);
Frame::writeField('s', stream, d->type);
stream << qint8(d->options);
Frame::writeField('F', stream, d->arguments);
frame.setArguments(arguments);
d->sendFrame(frame);
d->delayedDeclare = false;
d->declare();
}
void Exchange::remove(bool ifUnused, bool noWait)
@ -179,73 +255,3 @@ void Exchange::publish(const QByteArray &message, const QString &key,
}
}
//////////////////////////////////////////////////////////////////////////
QString ExchangePrivate::typeToString(Exchange::ExchangeType type)
{
switch (type) {
case Exchange::Direct: return QLatin1String("direct");
case Exchange::FanOut: return QLatin1String("fanout");
case Exchange::Topic: return QLatin1String("topic");
case Exchange::Headers: return QLatin1String("headers");
}
return QLatin1String("direct");
}
ExchangePrivate::ExchangePrivate(Exchange *q)
: ChannelPrivate(q),
delayedDeclare(false),
declared(false)
{
}
bool ExchangePrivate::_q_method(const Frame::Method &frame)
{
if (ChannelPrivate::_q_method(frame))
return true;
if (frame.methodClass() != Frame::fcExchange)
return false;
switch(frame.id()) {
case miDeclareOk:
declareOk(frame);
break;
case miDeleteOk:
deleteOk(frame);
break;
default:
break;
}
return true;
}
void ExchangePrivate::declareOk(const Frame::Method &frame)
{
Q_UNUSED(frame)
Q_Q(Exchange);
qDebug() << "Declared exchange: " << name;
declared = true;
Q_EMIT q->declared();
}
void ExchangePrivate::deleteOk(const Frame::Method &frame)
{
Q_UNUSED(frame)
Q_Q(Exchange);
qDebug() << "Deleted exchange: " << name;
declared = false;
Q_EMIT q->removed();
}
void ExchangePrivate::_q_disconnected()
{
ChannelPrivate::_q_disconnected();
qDebug() << "Exchange " << name << " disconnected";
delayedDeclare = false;
declared = false;
}

View File

@ -18,6 +18,8 @@ public:
ExchangePrivate(Exchange *q);
static QString typeToString(Exchange::ExchangeType type);
void declare();
// method handler related
virtual void _q_disconnected();
virtual bool _q_method(const Frame::Method &frame);