diff --git a/src/amqp_exchange.cpp b/src/amqp_exchange.cpp index f10dde3..b8b94aa 100644 --- a/src/amqp_exchange.cpp +++ b/src/amqp_exchange.cpp @@ -8,6 +8,108 @@ using namespace QAMQP; + +QString ExchangePrivate::typeToString(Exchange::ExchangeType type) +{ + switch (type) { + case Exchange::Direct: return QLatin1String("direct"); + case Exchange::FanOut: return QLatin1String("fanout"); + case Exchange::Topic: return QLatin1String("topic"); + case Exchange::Headers: return QLatin1String("headers"); + } + + return QLatin1String("direct"); +} + +ExchangePrivate::ExchangePrivate(Exchange *q) + : ChannelPrivate(q), + delayedDeclare(false), + declared(false) +{ +} + +void ExchangePrivate::declare() +{ + if (!opened) { + delayedDeclare = true; + return; + } + + if (name.isEmpty()) { + qDebug() << Q_FUNC_INFO << "attempting to declare an unnamed exchange, aborting..."; + return; + } + + Frame::Method frame(Frame::fcExchange, ExchangePrivate::miDeclare); + frame.setChannel(number); + + QByteArray args; + QDataStream stream(&args, QIODevice::WriteOnly); + + stream << qint16(0); //reserved 1 + Frame::writeField('s', stream, name); + Frame::writeField('s', stream, type); + + stream << qint8(options); + Frame::writeField('F', stream, arguments); + + frame.setArguments(args); + sendFrame(frame); + delayedDeclare = false; +} + +bool ExchangePrivate::_q_method(const Frame::Method &frame) +{ + if (ChannelPrivate::_q_method(frame)) + return true; + + if (frame.methodClass() != Frame::fcExchange) + return false; + + switch(frame.id()) { + case miDeclareOk: + declareOk(frame); + break; + case miDeleteOk: + deleteOk(frame); + break; + default: + break; + } + + return true; +} + +void ExchangePrivate::declareOk(const Frame::Method &frame) +{ + Q_UNUSED(frame) + + Q_Q(Exchange); + qDebug() << "Declared exchange: " << name; + declared = true; + Q_EMIT q->declared(); +} + +void ExchangePrivate::deleteOk(const Frame::Method &frame) +{ + Q_UNUSED(frame) + + Q_Q(Exchange); + qDebug() << "Deleted exchange: " << name; + declared = false; + Q_EMIT q->removed(); +} + +void ExchangePrivate::_q_disconnected() +{ + ChannelPrivate::_q_disconnected(); + qDebug() << "Exchange " << name << " disconnected"; + delayedDeclare = false; + declared = false; +} + +////////////////////////////////////////////////////////////////////////// + Exchange::Exchange(int channelNumber, Client *parent) : Channel(new ExchangePrivate(this), parent) { @@ -24,7 +126,7 @@ void Exchange::channelOpened() { Q_D(Exchange); if (d->delayedDeclare) - declare(Exchange::Direct); + d->declare(); } void Exchange::channelClosed() @@ -55,33 +157,7 @@ void Exchange::declare(const QString &type, ExchangeOptions options , const Fram d->type = type; d->options = options; d->arguments = args; - - if (!d->opened) { - d->delayedDeclare = true; - return; - } - - if (d->name.isEmpty()) { - qDebug() << Q_FUNC_INFO << "attempting to declare an unnamed exchange, aborting..."; - return; - } - - Frame::Method frame(Frame::fcExchange, ExchangePrivate::miDeclare); - frame.setChannel(d->number); - - QByteArray arguments; - QDataStream stream(&arguments, QIODevice::WriteOnly); - - stream << qint16(0); //reserver 1 - Frame::writeField('s', stream, d->name); - Frame::writeField('s', stream, d->type); - - stream << qint8(d->options); - Frame::writeField('F', stream, d->arguments); - - frame.setArguments(arguments); - d->sendFrame(frame); - d->delayedDeclare = false; + d->declare(); } void Exchange::remove(bool ifUnused, bool noWait) @@ -179,73 +255,3 @@ void Exchange::publish(const QByteArray &message, const QString &key, } } -////////////////////////////////////////////////////////////////////////// - -QString ExchangePrivate::typeToString(Exchange::ExchangeType type) -{ - switch (type) { - case Exchange::Direct: return QLatin1String("direct"); - case Exchange::FanOut: return QLatin1String("fanout"); - case Exchange::Topic: return QLatin1String("topic"); - case Exchange::Headers: return QLatin1String("headers"); - } - - return QLatin1String("direct"); -} - -ExchangePrivate::ExchangePrivate(Exchange *q) - : ChannelPrivate(q), - delayedDeclare(false), - declared(false) -{ -} - -bool ExchangePrivate::_q_method(const Frame::Method &frame) -{ - if (ChannelPrivate::_q_method(frame)) - return true; - - if (frame.methodClass() != Frame::fcExchange) - return false; - - switch(frame.id()) { - case miDeclareOk: - declareOk(frame); - break; - case miDeleteOk: - deleteOk(frame); - break; - default: - break; - } - - return true; -} - -void ExchangePrivate::declareOk(const Frame::Method &frame) -{ - Q_UNUSED(frame) - - Q_Q(Exchange); - qDebug() << "Declared exchange: " << name; - declared = true; - Q_EMIT q->declared(); -} - -void ExchangePrivate::deleteOk(const Frame::Method &frame) -{ - Q_UNUSED(frame) - - Q_Q(Exchange); - qDebug() << "Deleted exchange: " << name; - declared = false; - Q_EMIT q->removed(); -} - -void ExchangePrivate::_q_disconnected() -{ - ChannelPrivate::_q_disconnected(); - qDebug() << "Exchange " << name << " disconnected"; - delayedDeclare = false; - declared = false; -} diff --git a/src/amqp_exchange_p.h b/src/amqp_exchange_p.h index 75e864f..83e00f1 100644 --- a/src/amqp_exchange_p.h +++ b/src/amqp_exchange_p.h @@ -18,6 +18,8 @@ public: ExchangePrivate(Exchange *q); static QString typeToString(Exchange::ExchangeType type); + void declare(); + // method handler related virtual void _q_disconnected(); virtual bool _q_method(const Frame::Method &frame);