diff --git a/src/amqp_authenticator.h b/src/amqp_authenticator.h index b2fd2d8..ee8cbc0 100644 --- a/src/amqp_authenticator.h +++ b/src/amqp_authenticator.h @@ -4,8 +4,6 @@ #include #include -#include "amqp_global.h" - namespace QAMQP { @@ -38,5 +36,6 @@ private: }; -} +} // namespace QAMQP + #endif // amqp_authenticator_h__ diff --git a/src/amqp_channel.cpp b/src/amqp_channel.cpp index 076d229..b5cbe84 100644 --- a/src/amqp_channel.cpp +++ b/src/amqp_channel.cpp @@ -1,11 +1,9 @@ #include "amqp_channel.h" #include "amqp_channel_p.h" -#include "amqp_connection_p.h" - #include "amqp_client.h" #include "amqp_client_p.h" +#include "amqp_connection_p.h" -#include #include #include @@ -13,59 +11,58 @@ using namespace QAMQP; ////////////////////////////////////////////////////////////////////////// -QAMQP::Channel::Channel(int channelNumber, Client *parent) +Channel::Channel(int channelNumber, Client *parent) : QObject(parent), d_ptr(new ChannelPrivate(this)) { - Q_D(QAMQP::Channel); + Q_D(Channel); d->init(channelNumber, parent); } -QAMQP::Channel::Channel(ChannelPrivate *dd, Client *parent) +Channel::Channel(ChannelPrivate *dd, Client *parent) : QObject(parent), d_ptr(dd) { } -QAMQP::Channel::~Channel() +Channel::~Channel() { } -void QAMQP::Channel::closeChannel() +void Channel::closeChannel() { - Q_D(QAMQP::Channel); + Q_D(Channel); d->needOpen = true; if (d->opened) d->close(0, QString(), 0,0); - } -void QAMQP::Channel::reopen() +void Channel::reopen() { - Q_D(QAMQP::Channel); + Q_D(Channel); closeChannel(); d->open(); } -QString QAMQP::Channel::name() const +QString Channel::name() const { - Q_D(const QAMQP::Channel); + Q_D(const Channel); return d->name; } -int QAMQP::Channel::channelNumber() const +int Channel::channelNumber() const { - Q_D(const QAMQP::Channel); + Q_D(const Channel); return d->number; } -void QAMQP::Channel::setName(const QString &name) +void Channel::setName(const QString &name) { - Q_D(QAMQP::Channel); + Q_D(Channel); d->name = name; } -void QAMQP::Channel::stateChanged(int state) +void Channel::stateChanged(int state) { switch(ChannelPrivate::State(state)) { case ChannelPrivate::csOpened: @@ -83,29 +80,29 @@ void QAMQP::Channel::stateChanged(int state) } } -void QAMQP::Channel::_q_method(const Frame::Method &frame) +void Channel::_q_method(const Frame::Method &frame) { - Q_D(QAMQP::Channel); + Q_D(Channel); d->_q_method(frame); } -bool QAMQP::Channel::isOpened() const +bool Channel::isOpened() const { - Q_D(const QAMQP::Channel); + Q_D(const Channel); return d->opened; } -void QAMQP::Channel::onOpen() +void Channel::onOpen() { } -void QAMQP::Channel::onClose() +void Channel::onClose() { } -void QAMQP::Channel::setQOS(qint32 prefetchSize, quint16 prefetchCount) +void Channel::setQOS(qint32 prefetchSize, quint16 prefetchCount) { - Q_D(QAMQP::Channel); + Q_D(Channel); d->setQOS(prefetchSize, prefetchCount); } @@ -126,7 +123,7 @@ ChannelPrivate::~ChannelPrivate() void ChannelPrivate::init(int channelNumber, Client *parent) { - Q_Q(QAMQP::Channel); + Q_Q(Channel); needOpen = channelNumber == -1 ? true : false; number = channelNumber == -1 ? ++nextChannelNumber_ : channelNumber; nextChannelNumber_ = qMax(channelNumber, (nextChannelNumber_ + 1)); @@ -134,14 +131,13 @@ void ChannelPrivate::init(int channelNumber, Client *parent) client_ = parent; } - -bool ChannelPrivate::_q_method(const QAMQP::Frame::Method &frame) +bool ChannelPrivate::_q_method(const Frame::Method &frame) { Q_ASSERT(frame.channel() == number); if (frame.channel() != number) return true; - if (frame.methodClass() != QAMQP::Frame::fcChannel) + if (frame.methodClass() != Frame::fcChannel) return false; qDebug("Channel#%d:", number); @@ -171,12 +167,10 @@ void ChannelPrivate::_q_open() open(); } - -void ChannelPrivate::sendFrame(const QAMQP::Frame::Base &frame) +void ChannelPrivate::sendFrame(const Frame::Base &frame) { - if (client_) { + if (client_) client_->d_func()->network_->sendFrame(frame); - } } void ChannelPrivate::open() @@ -188,7 +182,7 @@ void ChannelPrivate::open() return; qDebug("Open channel #%d", number); - QAMQP::Frame::Method frame(QAMQP::Frame::fcChannel, miOpen); + Frame::Method frame(Frame::fcChannel, miOpen); frame.setChannel(number); QByteArray arguments_; arguments_.resize(1); @@ -201,7 +195,7 @@ void ChannelPrivate::flow() { } -void ChannelPrivate::flow(const QAMQP::Frame::Method &frame) +void ChannelPrivate::flow(const Frame::Method &frame) { Q_UNUSED(frame); } @@ -210,21 +204,21 @@ void ChannelPrivate::flowOk() { } -void ChannelPrivate::flowOk(const QAMQP::Frame::Method &frame) +void ChannelPrivate::flowOk(const Frame::Method &frame) { Q_UNUSED(frame); } void ChannelPrivate::close(int code, const QString &text, int classId, int methodId) { - QAMQP::Frame::Method frame(QAMQP::Frame::fcChannel, miClose); + Frame::Method frame(Frame::fcChannel, miClose); QByteArray arguments_; QDataStream stream(&arguments_, QIODevice::WriteOnly); - QAMQP::Frame::writeField('s',stream, client_->virtualHost()); + Frame::writeField('s',stream, client_->virtualHost()); stream << qint16(code); - QAMQP::Frame::writeField('s', stream, text); + Frame::writeField('s', stream, text); stream << qint16(classId); stream << qint16(methodId); @@ -232,9 +226,9 @@ void ChannelPrivate::close(int code, const QString &text, int classId, int metho client_->d_func()->network_->sendFrame(frame); } -void ChannelPrivate::close(const QAMQP::Frame::Method &frame) +void ChannelPrivate::close(const Frame::Method &frame) { - Q_Q(QAMQP::Channel); + Q_Q(Channel); q->stateChanged(csClosed); qDebug(">> CLOSE"); @@ -242,7 +236,7 @@ void ChannelPrivate::close(const QAMQP::Frame::Method &frame) QDataStream stream(&data, QIODevice::ReadOnly); qint16 code_ = 0, classId, methodId; stream >> code_; - QString text(QAMQP::Frame::readField('s', stream).toString()); + QString text(Frame::readField('s', stream).toString()); stream >> classId; stream >> methodId; @@ -254,24 +248,24 @@ void ChannelPrivate::close(const QAMQP::Frame::Method &frame) void ChannelPrivate::closeOk() { - QAMQP::Frame::Method frame(QAMQP::Frame::fcChannel, miCloseOk); + Frame::Method frame(Frame::fcChannel, miCloseOk); sendFrame(frame); } -void ChannelPrivate::closeOk(const QAMQP::Frame::Method &frame) +void ChannelPrivate::closeOk(const Frame::Method &frame) { Q_UNUSED(frame) - Q_Q(QAMQP::Channel); + Q_Q(Channel); q->stateChanged(csClosed); q->onClose(); opened = false; } -void ChannelPrivate::openOk(const QAMQP::Frame::Method &frame) +void ChannelPrivate::openOk(const Frame::Method &frame) { Q_UNUSED(frame) - Q_Q(QAMQP::Channel); + Q_Q(Channel); qDebug(">> OpenOK"); opened = true; diff --git a/src/amqp_channel.h b/src/amqp_channel.h index 2a2ecc6..09ed038 100644 --- a/src/amqp_channel.h +++ b/src/amqp_channel.h @@ -2,7 +2,6 @@ #define amqp_channel_h__ #include -#include "amqp_global.h" #include "amqp_frame.h" namespace QAMQP @@ -13,46 +12,48 @@ class ChannelPrivate; class Channel : public QObject, public Frame::MethodHandler { Q_OBJECT - Q_PROPERTY(int number READ channelNumber) + Q_PROPERTY(int number READ channelNumber CONSTANT) + Q_PROPERTY(bool opened READ isOpened CONSTANT) Q_PROPERTY(QString name READ name WRITE setName) - public: - ~Channel(); + virtual ~Channel(); - void closeChannel(); - void reopen(); - - QString name() const; int channelNumber() const; - - void setName(const QString &name); - void setQOS(qint32 prefetchSize, quint16 prefetchCount); bool isOpened() const; -signals: + QString name() const; + void setName(const QString &name); + +public Q_SLOTS: + void closeChannel(); + void reopen(); + void setQOS(qint32 prefetchSize, quint16 prefetchCount); + +Q_SIGNALS: void opened(); void closed(); void flowChanged(bool enabled); protected: - Q_DISABLE_COPY(Channel) - Q_DECLARE_PRIVATE(QAMQP::Channel) - Channel(int channelNumber = -1, Client *parent = 0); Channel(ChannelPrivate *dd, Client *parent = 0); + + Q_DISABLE_COPY(Channel) + Q_DECLARE_PRIVATE(Channel) QScopedPointer d_ptr; Q_PRIVATE_SLOT(d_func(), void _q_open()) Q_PRIVATE_SLOT(d_func(), void _q_disconnected()) + // should move to private classes virtual void onOpen(); virtual void onClose(); void stateChanged(int state); - void _q_method(const QAMQP::Frame::Method &frame); + void _q_method(const Frame::Method &frame); friend class ClientPrivate; }; -} +} // namespace QAMQP #endif diff --git a/src/amqp_channel_p.h b/src/amqp_channel_p.h index b719f47..556dd9a 100644 --- a/src/amqp_channel_p.h +++ b/src/amqp_channel_p.h @@ -1,7 +1,6 @@ #ifndef amqp_channel_p_h__ #define amqp_channel_p_h__ -#include "amqp_global.h" #include #define METHOD_ID_ENUM(name, id) name = id, name ## Ok @@ -81,5 +80,6 @@ public: Channel * const q_ptr; }; -} +} // namespace QAMQP + #endif // amqp_channel_p_h__ diff --git a/src/amqp_client.cpp b/src/amqp_client.cpp index 431319c..543cc57 100644 --- a/src/amqp_client.cpp +++ b/src/amqp_client.cpp @@ -1,8 +1,6 @@ #include "amqp_client.h" #include "amqp_client_p.h" - -#include -#include +#include "amqp_global.h" #include "amqp_exchange.h" #include "amqp_exchange_p.h" #include "amqp_queue.h" @@ -10,6 +8,8 @@ #include "amqp_connection_p.h" #include "amqp_authenticator.h" +#include + using namespace QAMQP; ClientPrivate::ClientPrivate(Client * q) diff --git a/src/amqp_client.h b/src/amqp_client.h index 636efd9..8126614 100644 --- a/src/amqp_client.h +++ b/src/amqp_client.h @@ -4,8 +4,6 @@ #include #include -#include "amqp_global.h" - namespace QAMQP { diff --git a/src/amqp_client_p.h b/src/amqp_client_p.h index d2d9e7a..6a33c60 100644 --- a/src/amqp_client_p.h +++ b/src/amqp_client_p.h @@ -3,7 +3,6 @@ #include -#include "amqp_global.h" #include "amqp_network.h" #include "amqp_connection.h" #include "amqp_authenticator.h" @@ -47,5 +46,6 @@ public: }; -} +} // namespace QAMQP + #endif // amqp_client_p_h__ diff --git a/src/amqp_connection.cpp b/src/amqp_connection.cpp index de95c08..37a7561 100644 --- a/src/amqp_connection.cpp +++ b/src/amqp_connection.cpp @@ -3,11 +3,10 @@ #include "amqp_client.h" #include "amqp_client_p.h" #include "amqp_frame.h" +#include "amqp_global.h" -#include #include #include -#include #include using namespace QAMQP; @@ -25,7 +24,7 @@ ConnectionPrivate::~ConnectionPrivate() void ConnectionPrivate::init(Client * parent) { - Q_Q(QAMQP::Connection); + Q_Q(Connection); q->setParent(parent); client_ = parent; heartbeatTimer_ = new QTimer(parent); @@ -34,19 +33,19 @@ void ConnectionPrivate::init(Client * parent) void ConnectionPrivate::startOk() { - QAMQP::Frame::Method frame(QAMQP::Frame::fcConnection, miStartOk); + Frame::Method frame(Frame::fcConnection, miStartOk); QByteArray arguments_; QDataStream stream(&arguments_, QIODevice::WriteOnly); - QAMQP::Frame::TableField clientProperties; + Frame::TableField clientProperties; clientProperties["version"] = QString(QAMQP_VERSION); clientProperties["platform"] = QString("Qt %1").arg(qVersion()); clientProperties["product"] = QString("QAMQP"); clientProperties.unite(customProperty); - QAMQP::Frame::serialize(stream, clientProperties); + Frame::serialize(stream, clientProperties); client_->d_func()->auth_->write(stream); - QAMQP::Frame::writeField('s', stream, "en_US"); + Frame::writeField('s', stream, "en_US"); frame.setArguments(arguments_); client_->d_func()->network_->sendFrame(frame); } @@ -57,7 +56,7 @@ void ConnectionPrivate::secureOk() void ConnectionPrivate::tuneOk() { - QAMQP::Frame::Method frame(QAMQP::Frame::fcConnection, miTuneOk); + Frame::Method frame(Frame::fcConnection, miTuneOk); QByteArray arguments_; QDataStream stream(&arguments_, QIODevice::WriteOnly); @@ -71,11 +70,11 @@ void ConnectionPrivate::tuneOk() void ConnectionPrivate::open() { - QAMQP::Frame::Method frame(QAMQP::Frame::fcConnection, miOpen); + Frame::Method frame(Frame::fcConnection, miOpen); QByteArray arguments_; QDataStream stream(&arguments_, QIODevice::WriteOnly); - QAMQP::Frame::writeField('s',stream, client_->virtualHost()); + Frame::writeField('s',stream, client_->virtualHost()); stream << qint8(0); stream << qint8(0); @@ -84,7 +83,7 @@ void ConnectionPrivate::open() client_->d_func()->network_->sendFrame(frame); } -void ConnectionPrivate::start(const QAMQP::Frame::Method &frame) +void ConnectionPrivate::start(const Frame::Method &frame) { qDebug(">> Start"); QByteArray data = frame.arguments(); @@ -94,16 +93,16 @@ void ConnectionPrivate::start(const QAMQP::Frame::Method &frame) stream >> version_major >> version_minor; - QAMQP::Frame::TableField table; - QAMQP::Frame::deserialize(stream, table); + Frame::TableField table; + Frame::deserialize(stream, table); - QString mechanisms = QAMQP::Frame::readField('S', stream).toString(); - QString locales = QAMQP::Frame::readField('S', stream).toString(); + QString mechanisms = Frame::readField('S', stream).toString(); + QString locales = Frame::readField('S', stream).toString(); qDebug(">> version_major: %d", version_major); qDebug(">> version_minor: %d", version_minor); - QAMQP::Frame::print(table); + Frame::print(table); qDebug(">> mechanisms: %s", qPrintable(mechanisms)); qDebug(">> locales: %s", qPrintable(locales)); @@ -111,19 +110,19 @@ void ConnectionPrivate::start(const QAMQP::Frame::Method &frame) startOk(); } -void ConnectionPrivate::secure(const QAMQP::Frame::Method &frame) +void ConnectionPrivate::secure(const Frame::Method &frame) { Q_UNUSED(frame) } -void ConnectionPrivate::tune(const QAMQP::Frame::Method &frame) +void ConnectionPrivate::tune(const Frame::Method &frame) { qDebug(">> Tune"); QByteArray data = frame.arguments(); QDataStream stream(&data, QIODevice::ReadOnly); qint16 channel_max = 0, - heartbeat = 0; + heartbeat = 0; qint32 frame_max = 0; stream >> channel_max; @@ -146,26 +145,26 @@ void ConnectionPrivate::tune(const QAMQP::Frame::Method &frame) open(); } -void ConnectionPrivate::openOk(const QAMQP::Frame::Method &frame) +void ConnectionPrivate::openOk(const Frame::Method &frame) { Q_UNUSED(frame) - Q_Q(QAMQP::Connection); + Q_Q(Connection); qDebug(">> OpenOK"); connected = true; q->openOk(); } -void ConnectionPrivate::close(const QAMQP::Frame::Method &frame) +void ConnectionPrivate::close(const Frame::Method &frame) { - Q_Q(QAMQP::Connection); + Q_Q(Connection); qDebug(">> CLOSE"); QByteArray data = frame.arguments(); QDataStream stream(&data, QIODevice::ReadOnly); qint16 code_ = 0, classId, methodId; stream >> code_; - QString text(QAMQP::Frame::readField('s', stream).toString()); + QString text(Frame::readField('s', stream).toString()); stream >> classId; stream >> methodId; @@ -180,14 +179,14 @@ void ConnectionPrivate::close(const QAMQP::Frame::Method &frame) void ConnectionPrivate::close(int code, const QString &text, int classId, int methodId) { - QAMQP::Frame::Method frame(QAMQP::Frame::fcConnection, miClose); + Frame::Method frame(Frame::fcConnection, miClose); QByteArray arguments_; QDataStream stream(&arguments_, QIODevice::WriteOnly); - QAMQP::Frame::writeField('s',stream, client_->virtualHost()); + Frame::writeField('s',stream, client_->virtualHost()); stream << qint16(code); - QAMQP::Frame::writeField('s', stream, text); + Frame::writeField('s', stream, text); stream << qint16(classId); stream << qint16(methodId); @@ -197,16 +196,15 @@ void ConnectionPrivate::close(int code, const QString &text, int classId, int me void ConnectionPrivate::closeOk() { - QAMQP::Frame::Method frame(QAMQP::Frame::fcConnection, miCloseOk); + Frame::Method frame(Frame::fcConnection, miCloseOk); connected = false; client_->d_func()->network_->sendFrame(frame); - } -void ConnectionPrivate::closeOk(const QAMQP::Frame::Method &frame) +void ConnectionPrivate::closeOk(const Frame::Method &frame) { Q_UNUSED(frame) - Q_Q(QAMQP::Connection); + Q_Q(Connection); connected = false; QMetaObject::invokeMethod(q, "disconnected"); @@ -214,10 +212,9 @@ void ConnectionPrivate::closeOk(const QAMQP::Frame::Method &frame) heartbeatTimer_->stop(); } - void ConnectionPrivate::setQOS(qint32 prefetchSize, quint16 prefetchCount, int channel, bool global) { - QAMQP::Frame::Method frame(QAMQP::Frame::fcBasic, 10); + Frame::Method frame(Frame::fcBasic, 10); frame.setChannel(channel); QByteArray arguments_; QDataStream out(&arguments_, QIODevice::WriteOnly); @@ -230,10 +227,10 @@ void ConnectionPrivate::setQOS(qint32 prefetchSize, quint16 prefetchCount, int c client_->d_func()->network_->sendFrame(frame); } -bool ConnectionPrivate::_q_method(const QAMQP::Frame::Method &frame) +bool ConnectionPrivate::_q_method(const Frame::Method &frame) { - Q_ASSERT(frame.methodClass() == QAMQP::Frame::fcConnection); - if (frame.methodClass() != QAMQP::Frame::fcConnection) + Q_ASSERT(frame.methodClass() == Frame::fcConnection); + if (frame.methodClass() != Frame::fcConnection) return true; qDebug() << "Connection:"; @@ -274,7 +271,7 @@ bool ConnectionPrivate::_q_method(const QAMQP::Frame::Method &frame) void ConnectionPrivate::_q_heartbeat() { - QAMQP::Frame::Heartbeat frame; + Frame::Heartbeat frame; client_->d_func()->network_->sendFrame(frame); } @@ -284,7 +281,7 @@ Connection::Connection(Client *parent) : QObject(parent), d_ptr(new ConnectionPrivate(this)) { - Q_D(QAMQP::Connection); + Q_D(Connection); d->init(parent); } @@ -294,37 +291,37 @@ Connection::~Connection() void Connection::startOk() { - Q_D(QAMQP::Connection); + Q_D(Connection); d->startOk(); } void Connection::secureOk() { - Q_D(QAMQP::Connection); + Q_D(Connection); d->secureOk(); } void Connection::tuneOk() { - Q_D(QAMQP::Connection); + Q_D(Connection); d->tuneOk(); } void Connection::open() { - Q_D(QAMQP::Connection); + Q_D(Connection); d->open(); } void Connection::close(int code, const QString &text, int classId , int methodId) { - Q_D(QAMQP::Connection); + Q_D(Connection); d->close(code, text, classId, methodId); } void Connection::closeOk() { - Q_D(QAMQP::Connection); + Q_D(Connection); d->closeOk(); Q_EMIT disconnect(); } @@ -334,33 +331,33 @@ void Connection::openOk() Q_EMIT connected(); } -void Connection::_q_method(const QAMQP::Frame::Method &frame) +void Connection::_q_method(const Frame::Method &frame) { - Q_D(QAMQP::Connection); + Q_D(Connection); d->_q_method(frame); } bool Connection::isConnected() const { - Q_D(const QAMQP::Connection); + Q_D(const Connection); return d->connected; } void Connection::setQOS(qint32 prefetchSize, quint16 prefetchCount) { - Q_D(QAMQP::Connection); + Q_D(Connection); d->setQOS(prefetchSize, prefetchCount, 0, true); } void Connection::addCustomProperty(const QString &name, const QString &value) { - Q_D(QAMQP::Connection); + Q_D(Connection); d->customProperty[name] = value; } QString Connection::customProperty(const QString &name) const { - Q_D(const QAMQP::Connection); + Q_D(const Connection); if (d->customProperty.contains(name)) return d->customProperty.value(name).toString(); return QString(); diff --git a/src/amqp_connection.h b/src/amqp_connection.h index a4a94f0..dc627a4 100644 --- a/src/amqp_connection.h +++ b/src/amqp_connection.h @@ -3,7 +3,6 @@ #include #include "amqp_frame.h" -#include "amqp_global.h" namespace QAMQP { @@ -15,12 +14,17 @@ class ConnectionPrivate; class Connection : public QObject, public Frame::MethodHandler { Q_OBJECT + Q_PROPERTY(bool connected READ isConnected CONSTANT) + public: virtual ~Connection(); void addCustomProperty(const QString &name, const QString &value); QString customProperty(const QString &name) const; + bool isConnected() const; + +public Q_SLOTS: void startOk(); void secureOk(); void tuneOk(); @@ -28,8 +32,6 @@ public: void close(int code, const QString &text, int classId = 0, int methodId = 0); void closeOk(); - bool isConnected() const; - void setQOS(qint32 prefetchSize, quint16 prefetchCount); Q_SIGNALS: @@ -37,20 +39,21 @@ Q_SIGNALS: void connected(); private: + explicit Connection(Client * parent = 0); + Q_DISABLE_COPY(Connection) Q_DECLARE_PRIVATE(Connection) QScopedPointer d_ptr; - Connection(Client * parent = 0); - - void openOk(); + Q_PRIVATE_SLOT(d_func(), void _q_heartbeat()) friend class ClientPrivate; friend class ChannelPrivate; - void _q_method(const QAMQP::Frame::Method &frame); - Q_PRIVATE_SLOT(d_func(), void _q_heartbeat()) + // should be moved to private + void openOk(); + void _q_method(const Frame::Method &frame); }; -} +} // namespace QAMQP #endif // amqp_connection_h__ diff --git a/src/amqp_connection_p.h b/src/amqp_connection_p.h index a6df532..79a79ae 100644 --- a/src/amqp_connection_p.h +++ b/src/amqp_connection_p.h @@ -1,14 +1,14 @@ #ifndef amqp_connection_p_h__ #define amqp_connection_p_h__ -#define METHOD_ID_ENUM(name, id) name = id, name ## Ok - #include -class QTimer; +#define METHOD_ID_ENUM(name, id) name = id, name ## Ok +class QTimer; namespace QAMQP { + class Client; class ClientPrivate; class Connection; @@ -34,14 +34,14 @@ public: void close(int code, const QString & text, int classId = 0, int methodId = 0); void closeOk(); - void start(const QAMQP::Frame::Method &frame); - void secure(const QAMQP::Frame::Method &frame); - void tune(const QAMQP::Frame::Method &frame); - void openOk(const QAMQP::Frame::Method &frame); - void close(const QAMQP::Frame::Method &frame); - void closeOk(const QAMQP::Frame::Method &frame); + void start(const Frame::Method &frame); + void secure(const Frame::Method &frame); + void tune(const Frame::Method &frame); + void openOk(const Frame::Method &frame); + void close(const Frame::Method &frame); + void closeOk(const Frame::Method &frame); - bool _q_method(const QAMQP::Frame::Method &frame); + bool _q_method(const Frame::Method &frame); void _q_heartbeat(); void setQOS(qint32 prefetchSize, quint16 prefetchCount, int channel, bool global); @@ -51,12 +51,12 @@ public: bool connected; QPointer heartbeatTimer_; - QAMQP::Frame::TableField customProperty; + Frame::TableField customProperty; - Q_DECLARE_PUBLIC(QAMQP::Connection) + Q_DECLARE_PUBLIC(Connection) Connection * const q_ptr; }; -} +} // namespace QAMQP #endif // amqp_connection_p_h__ diff --git a/src/amqp_exchange.cpp b/src/amqp_exchange.cpp index 2f20a2b..da784dc 100644 --- a/src/amqp_exchange.cpp +++ b/src/amqp_exchange.cpp @@ -1,18 +1,17 @@ #include "amqp_exchange.h" #include "amqp_exchange_p.h" #include "amqp_queue.h" +#include "amqp_global.h" + +#include +#include using namespace QAMQP; -using namespace QAMQP::Frame; - -#include -#include -#include Exchange::Exchange(int channelNumber, Client *parent) : Channel(new ExchangePrivate(this), parent) { - Q_D(QAMQP::Exchange); + Q_D(Exchange); d->init(channelNumber, parent); } @@ -23,32 +22,32 @@ Exchange::~Exchange() void Exchange::onOpen() { - Q_D(QAMQP::Exchange); + Q_D(Exchange); if (d->delayedDeclare) d->declare(); } void Exchange::onClose() { - Q_D(QAMQP::Exchange); + Q_D(Exchange); d->remove(true, true); } Exchange::ExchangeOptions Exchange::option() const { - Q_D(const QAMQP::Exchange); + Q_D(const Exchange); return d->options; } QString Exchange::type() const { - Q_D(const QAMQP::Exchange); + Q_D(const Exchange); return d->type; } -void Exchange::declare(const QString &type, ExchangeOptions option , const TableField &arg) +void Exchange::declare(const QString &type, ExchangeOptions option , const Frame::TableField &arg) { - Q_D(QAMQP::Exchange); + Q_D(Exchange); d->options = option; d->type = type; d->arguments = arg; @@ -57,13 +56,13 @@ void Exchange::declare(const QString &type, ExchangeOptions option , const Table void Exchange::remove(bool ifUnused, bool noWait) { - Q_D(QAMQP::Exchange); + Q_D(Exchange); d->remove(ifUnused, noWait); } -void Exchange::bind(QAMQP::Queue *queue) +void Exchange::bind(Queue *queue) { - Q_D(QAMQP::Exchange); + Q_D(Exchange); queue->bind(this, d->name); } @@ -82,14 +81,14 @@ void Exchange::bind(const QString &queueName, const QString &key) void Exchange::publish(const QString &message, const QString &key, const MessageProperties &prop) { - Q_D(QAMQP::Exchange); + Q_D(Exchange); d->publish(message.toUtf8(), key, QLatin1String("text.plain"), QVariantHash(), prop); } void Exchange::publish(const QByteArray &message, const QString &key, const QString &mimeType, const MessageProperties &prop) { - Q_D(QAMQP::Exchange); + Q_D(Exchange); d->publish(message, key, mimeType, QVariantHash(), prop); } @@ -97,7 +96,7 @@ void Exchange::publish(const QByteArray &message, const QString &key, const QVariantHash &headers, const QString &mimeType, const MessageProperties &prop) { - Q_D(QAMQP::Exchange); + Q_D(Exchange); d->publish(message, key, mimeType, headers, prop); } @@ -114,12 +113,12 @@ ExchangePrivate::~ExchangePrivate() { } -bool ExchangePrivate::_q_method(const QAMQP::Frame::Method &frame) +bool ExchangePrivate::_q_method(const Frame::Method &frame) { if (ChannelPrivate::_q_method(frame)) return true; - if (frame.methodClass() != QAMQP::Frame::fcExchange) + if (frame.methodClass() != Frame::fcExchange) return false; switch(frame.id()) { @@ -136,19 +135,19 @@ bool ExchangePrivate::_q_method(const QAMQP::Frame::Method &frame) return true; } -void ExchangePrivate::declareOk(const QAMQP::Frame::Method &frame) +void ExchangePrivate::declareOk(const Frame::Method &frame) { Q_UNUSED(frame) - Q_Q(QAMQP::Exchange); + Q_Q(Exchange); qDebug() << "Declared exchange: " << name; declared = true; QMetaObject::invokeMethod(q, "declared"); } -void ExchangePrivate::deleteOk(const QAMQP::Frame::Method &frame) +void ExchangePrivate::deleteOk(const Frame::Method &frame) { Q_UNUSED(frame) - Q_Q(QAMQP::Exchange); + Q_Q(Exchange); qDebug() << "Deleted exchange: " << name; declared = false; QMetaObject::invokeMethod(q, "removed"); @@ -164,16 +163,16 @@ void ExchangePrivate::declare() if (name.isEmpty()) return; - QAMQP::Frame::Method frame(QAMQP::Frame::fcExchange, miDeclare); + Frame::Method frame(Frame::fcExchange, miDeclare); frame.setChannel(number); QByteArray arguments_; QDataStream stream(&arguments_, QIODevice::WriteOnly); stream << qint16(0); //reserver 1 - writeField('s', stream, name); - writeField('s', stream, type); + Frame::writeField('s', stream, name); + Frame::writeField('s', stream, type); stream << qint8(options); - writeField('F', stream, ExchangePrivate::arguments); + Frame::writeField('F', stream, ExchangePrivate::arguments); frame.setArguments(arguments_); sendFrame(frame); @@ -182,13 +181,13 @@ void ExchangePrivate::declare() void ExchangePrivate::remove(bool ifUnused, bool noWait) { - QAMQP::Frame::Method frame(QAMQP::Frame::fcExchange, miDelete); + Frame::Method frame(Frame::fcExchange, miDelete); frame.setChannel(number); QByteArray arguments_; QDataStream stream(&arguments_, QIODevice::WriteOnly); stream << qint16(0); //reserver 1 - writeField('s', stream, name); + Frame::writeField('s', stream, name); qint8 flag = 0; @@ -205,25 +204,25 @@ void ExchangePrivate::publish(const QByteArray &message, const QString &key, const QString &mimeType, const QVariantHash & headers, const Exchange::MessageProperties &prop) { - QAMQP::Frame::Method frame(QAMQP::Frame::fcBasic, bmPublish); + Frame::Method frame(Frame::fcBasic, bmPublish); frame.setChannel(number); QByteArray arguments_; QDataStream out(&arguments_, QIODevice::WriteOnly); out << qint16(0); //reserver 1 - writeField('s', out, name); - writeField('s', out, key); + Frame::writeField('s', out, name); + Frame::writeField('s', out, key); out << qint8(0); frame.setArguments(arguments_); sendFrame(frame); - QAMQP::Frame::Content content(QAMQP::Frame::fcBasic); + Frame::Content content(Frame::fcBasic); content.setChannel(number); - content.setProperty(Content::cpContentType, mimeType); - content.setProperty(Content::cpContentEncoding, "utf-8"); - content.setProperty(Content::cpHeaders, headers); - content.setProperty(Content::cpMessageId, "0"); + content.setProperty(Frame::Content::cpContentType, mimeType); + content.setProperty(Frame::Content::cpContentEncoding, "utf-8"); + content.setProperty(Frame::Content::cpHeaders, headers); + content.setProperty(Frame::Content::cpMessageId, "0"); Exchange::MessageProperties::ConstIterator i; @@ -235,7 +234,7 @@ void ExchangePrivate::publish(const QByteArray &message, const QString &key, int fullSize = message.size(); for (int sended_ = 0; sended_ < fullSize; sended_+= (FRAME_MAX - 7)) { - QAMQP::Frame::ContentBody body; + Frame::ContentBody body; QByteArray partition_ = message.mid(sended_, (FRAME_MAX - 7)); body.setChannel(number); body.setBody(partition_); diff --git a/src/amqp_exchange.h b/src/amqp_exchange.h index 7ec1acd..79db578 100644 --- a/src/amqp_exchange.h +++ b/src/amqp_exchange.h @@ -6,8 +6,6 @@ namespace QAMQP { -using namespace QAMQP::Frame; - class Client; class Queue; class ClientPrivate; @@ -30,7 +28,7 @@ public: }; Q_DECLARE_FLAGS(ExchangeOptions, ExchangeOption) - typedef QHash MessageProperties; + typedef QHash MessageProperties; virtual ~Exchange(); @@ -39,10 +37,10 @@ public: void declare(const QString &type = QLatin1String("direct"), ExchangeOptions option = NoOptions, - const TableField &arg = TableField()); + const Frame::TableField &arg = Frame::TableField()); void remove(bool ifUnused = true, bool noWait = true); - void bind(QAMQP::Queue *queue); + void bind(Queue *queue); void bind(const QString &queueName); void bind(const QString &queueName, const QString &key); @@ -62,14 +60,15 @@ protected: void onClose(); private: + explicit Exchange(int channelNumber = -1, Client * parent = 0); + Q_DISABLE_COPY(Exchange) - Q_DECLARE_PRIVATE(QAMQP::Exchange) - Exchange(int channelNumber = -1, Client * parent = 0); + Q_DECLARE_PRIVATE(Exchange) friend class ClientPrivate; }; -} +} // namespace QAMQP Q_DECLARE_OPERATORS_FOR_FLAGS(QAMQP::Exchange::ExchangeOptions) diff --git a/src/amqp_exchange_p.h b/src/amqp_exchange_p.h index 241ff04..b4afea7 100644 --- a/src/amqp_exchange_p.h +++ b/src/amqp_exchange_p.h @@ -1,3 +1,6 @@ +#ifndef amqp_exchange_p_h__ +#define amqp_exchange_p_h__ + #include "amqp_channel_p.h" #define METHOD_ID_ENUM(name, id) name = id, name ## Ok @@ -5,7 +8,6 @@ namespace QAMQP { -using namespace QAMQP::Frame; class ExchangePrivate: public ChannelPrivate { public: @@ -20,8 +22,8 @@ public: void declare(); void remove(bool ifUnused = true, bool noWait = true); - void declareOk(const QAMQP::Frame::Method &frame); - void deleteOk(const QAMQP::Frame::Method &frame); + void declareOk(const Frame::Method &frame); + void deleteOk(const Frame::Method &frame); void publish(const QByteArray &message, const QString &key, const QString &mimeType = QLatin1String("text/plain"), @@ -30,15 +32,17 @@ public: QString type; Exchange::ExchangeOptions options; - TableField arguments; + Frame::TableField arguments; - bool _q_method(const QAMQP::Frame::Method &frame); + bool _q_method(const Frame::Method &frame); void _q_disconnected(); bool delayedDeclare; bool declared; - Q_DECLARE_PUBLIC(QAMQP::Exchange) + Q_DECLARE_PUBLIC(Exchange) }; -} +} // namespace QAMQP + +#endif // amqp_exchange_p_h__ diff --git a/src/amqp_frame.cpp b/src/amqp_frame.cpp index 268859c..e1d566a 100644 --- a/src/amqp_frame.cpp +++ b/src/amqp_frame.cpp @@ -6,7 +6,9 @@ #include #include +using namespace QAMQP; using namespace QAMQP::Frame; + Base::Base(Type type) : size_(0), type_(type), @@ -42,42 +44,44 @@ qint32 Base::size() const return 0; } -void QAMQP::Frame::Base::writeHeader(QDataStream &stream) const +void Base::writeHeader(QDataStream &stream) const { stream << type_; stream << channel_; stream << qint32(size()); } -void QAMQP::Frame::Base::writeEnd(QDataStream &stream) const +void Base::writeEnd(QDataStream &stream) const { stream << qint8(FRAME_END); } -void QAMQP::Frame::Base::writePayload( QDataStream & ) const{} +void Base::writePayload(QDataStream &stream) const +{ + Q_UNUSED(stream) +} -void QAMQP::Frame::Base::readHeader( QDataStream & stream ) +void Base::readHeader(QDataStream &stream) { stream >> type_; stream >> channel_; stream >> size_; } -void QAMQP::Frame::Base::readEnd(QDataStream &stream) +void Base::readEnd(QDataStream &stream) { unsigned char end_ = 0; stream.readRawData(reinterpret_cast(&end_), sizeof(end_)); - if (end_ != FRAME_END) { + if (end_ != FRAME_END) qWarning("Wrong end of frame"); - } } -void QAMQP::Frame::Base::readPayload(QDataStream &stream) +void Base::readPayload(QDataStream &stream) { stream.skipRawData(size_); } -void QAMQP::Frame::Base::toStream(QDataStream &stream) const +void Base::toStream(QDataStream &stream) const { writeHeader(stream); writePayload(stream); @@ -86,47 +90,47 @@ void QAMQP::Frame::Base::toStream(QDataStream &stream) const ////////////////////////////////////////////////////////////////////////// -QAMQP::Frame::Method::Method(MethodClass methodClass, qint16 id) +Frame::Method::Method(MethodClass methodClass, qint16 id) : Base(ftMethod), methodClass_(methodClass), id_(id) { } -QAMQP::Frame::Method::Method(QDataStream &raw) +Frame::Method::Method(QDataStream &raw) : Base(raw) { readPayload(raw); } -QAMQP::Frame::Method::Method(): Base(ftMethod) +Frame::Method::Method(): Base(ftMethod) { } -MethodClass QAMQP::Frame::Method::methodClass() const +MethodClass Frame::Method::methodClass() const { return MethodClass(methodClass_); } -qint16 QAMQP::Frame::Method::id() const +qint16 Frame::Method::id() const { return id_; } -qint32 QAMQP::Frame::Method::size() const +qint32 Frame::Method::size() const { return sizeof(id_) + sizeof(methodClass_) + arguments_.size(); } -void QAMQP::Frame::Method::setArguments(const QByteArray &data) +void Frame::Method::setArguments(const QByteArray &data) { arguments_ = data; } -QByteArray QAMQP::Frame::Method::arguments() const +QByteArray Frame::Method::arguments() const { return arguments_; } -void QAMQP::Frame::Method::readPayload(QDataStream &stream) +void Frame::Method::readPayload(QDataStream &stream) { stream >> methodClass_; stream >> id_; @@ -135,7 +139,7 @@ void QAMQP::Frame::Method::readPayload(QDataStream &stream) stream.readRawData(arguments_.data(), arguments_.size()); } -void QAMQP::Frame::Method::writePayload(QDataStream &stream) const +void Frame::Method::writePayload(QDataStream &stream) const { stream << quint16(methodClass_); stream << quint16(id_); @@ -144,7 +148,7 @@ void QAMQP::Frame::Method::writePayload(QDataStream &stream) const ////////////////////////////////////////////////////////////////////////// -QVariant QAMQP::Frame::readField(qint8 valueType, QDataStream &s) +QVariant Frame::readField(qint8 valueType, QDataStream &s) { QVariant value; QByteArray tmp; @@ -224,10 +228,10 @@ QVariant QAMQP::Frame::readField(qint8 valueType, QDataStream &s) } case 'D': { - QAMQP::Frame::decimal v; + Frame::decimal v; s >> v.scale; s >> v.value; - value = QVariant::fromValue(v); + value = QVariant::fromValue(v); } break; case 's': @@ -281,7 +285,7 @@ QVariant QAMQP::Frame::readField(qint8 valueType, QDataStream &s) return value; } -QDataStream & QAMQP::Frame::deserialize(QDataStream &stream, QAMQP::Frame::TableField &f) +QDataStream & Frame::deserialize(QDataStream &stream, Frame::TableField &f) { QByteArray data; stream >> data; @@ -297,7 +301,7 @@ QDataStream & QAMQP::Frame::deserialize(QDataStream &stream, QAMQP::Frame::Table return stream; } -QDataStream & QAMQP::Frame::serialize(QDataStream &stream, const TableField &f) +QDataStream & Frame::serialize(QDataStream &stream, const TableField &f) { QByteArray data; QDataStream s(&data, QIODevice::WriteOnly); @@ -316,7 +320,7 @@ QDataStream & QAMQP::Frame::serialize(QDataStream &stream, const TableField &f) return stream; } -void QAMQP::Frame::print(const TableField &f) +void Frame::print(const TableField &f) { TableField::ConstIterator i; for (i = f.begin(); i != f.end(); ++i) { @@ -333,7 +337,7 @@ void QAMQP::Frame::print(const TableField &f) } } -void QAMQP::Frame::writeField(qint8 valueType, QDataStream &s, const QVariant &value, bool withType) +void Frame::writeField(qint8 valueType, QDataStream &s, const QVariant &value, bool withType) { QByteArray tmp; if (withType) @@ -375,7 +379,7 @@ void QAMQP::Frame::writeField(qint8 valueType, QDataStream &s, const QVariant &v break; case 'D': { - QAMQP::Frame::decimal v(value.value()); + Frame::decimal v(value.value()); s << v.scale; s << v.value; } @@ -418,7 +422,7 @@ void QAMQP::Frame::writeField(qint8 valueType, QDataStream &s, const QVariant &v } } -void QAMQP::Frame::writeField(QDataStream &s, const QVariant &value) +void Frame::writeField(QDataStream &s, const QVariant &value) { char type = 0; switch (value.type()) { @@ -484,29 +488,29 @@ void QAMQP::Frame::writeField(QDataStream &s, const QVariant &value) ////////////////////////////////////////////////////////////////////////// -QAMQP::Frame::Content::Content() +Content::Content() : Base(ftHeader) { } -QAMQP::Frame::Content::Content(MethodClass methodClass) +Content::Content(MethodClass methodClass) : Base(ftHeader) { methodClass_ = methodClass; } -QAMQP::Frame::Content::Content(QDataStream &raw) +Content::Content(QDataStream &raw) : Base(raw) { readPayload(raw); } -QAMQP::Frame::MethodClass QAMQP::Frame::Content::methodClass() const +MethodClass Content::methodClass() const { return MethodClass(methodClass_); } -qint32 QAMQP::Frame::Content::size() const +qint32 Content::size() const { QDataStream out(&buffer_, QIODevice::WriteOnly); buffer_.clear(); @@ -564,32 +568,32 @@ qint32 QAMQP::Frame::Content::size() const return buffer_.size(); } -void QAMQP::Frame::Content::setBody(const QByteArray &data) +void Content::setBody(const QByteArray &data) { body_ = data; } -QByteArray QAMQP::Frame::Content::body() const +QByteArray Content::body() const { return body_; } -void QAMQP::Frame::Content::setProperty(Property prop, const QVariant &value) +void Content::setProperty(Property prop, const QVariant &value) { properties_[prop] = value; } -QVariant QAMQP::Frame::Content::property(Property prop) const +QVariant Content::property(Property prop) const { return properties_.value(prop); } -void QAMQP::Frame::Content::writePayload(QDataStream &out) const +void Content::writePayload(QDataStream &out) const { out.writeRawData(buffer_.data(), buffer_.size()); } -void QAMQP::Frame::Content::readPayload(QDataStream &in) +void Content::readPayload(QDataStream &in) { in >> methodClass_; in.skipRawData(2); //weight @@ -639,7 +643,7 @@ void QAMQP::Frame::Content::readPayload(QDataStream &in) properties_[cpClusterID] = readField('s', in); } -qlonglong QAMQP::Frame::Content::bodySize() const +qlonglong Content::bodySize() const { return body_.isEmpty() ? bodySize_ : body_.size(); } @@ -650,44 +654,51 @@ ContentBody::ContentBody() { } -QAMQP::Frame::ContentBody::ContentBody(QDataStream &raw) +ContentBody::ContentBody(QDataStream &raw) : Base(raw) { readPayload(raw); } -void QAMQP::Frame::ContentBody::setBody(const QByteArray &data) +void ContentBody::setBody(const QByteArray &data) { body_ = data; } -QByteArray QAMQP::Frame::ContentBody::body() const +QByteArray ContentBody::body() const { return body_; } -void QAMQP::Frame::ContentBody::writePayload(QDataStream &out) const +void ContentBody::writePayload(QDataStream &out) const { out.writeRawData(body_.data(), body_.size()); } -void QAMQP::Frame::ContentBody::readPayload(QDataStream &in) +void ContentBody::readPayload(QDataStream &in) { body_.resize(size_); in.readRawData(body_.data(), body_.size()); } -qint32 QAMQP::Frame::ContentBody::size() const +qint32 ContentBody::size() const { return body_.size(); } ////////////////////////////////////////////////////////////////////////// -QAMQP::Frame::Heartbeat::Heartbeat() +Heartbeat::Heartbeat() : Base(ftHeartbeat) { } -void QAMQP::Frame::Heartbeat::readPayload(QDataStream &) {} -void QAMQP::Frame::Heartbeat::writePayload(QDataStream &) const {} +void Heartbeat::readPayload(QDataStream &stream) +{ + Q_UNUSED(stream) +} + +void Heartbeat::writePayload(QDataStream &stream) const +{ + Q_UNUSED(stream) +} diff --git a/src/amqp_frame.h b/src/amqp_frame.h index 91dd85a..513cf8e 100644 --- a/src/amqp_frame.h +++ b/src/amqp_frame.h @@ -28,7 +28,6 @@ namespace QAMQP { class QueuePrivate; - namespace Frame { typedef quint16 channel_t; @@ -86,12 +85,12 @@ namespace Frame */ typedef QHash TableField; - QDataStream & serialize( QDataStream & stream, const QAMQP::Frame::TableField & f ); - QDataStream & deserialize( QDataStream & stream, QAMQP::Frame::TableField & f ); + QDataStream & serialize( QDataStream & stream, const Frame::TableField & f ); + QDataStream & deserialize( QDataStream & stream, Frame::TableField & f ); QVariant readField( qint8 valueType, QDataStream &s ); void writeField( QDataStream &s, const QVariant & value ); void writeField( qint8 valueType, QDataStream &s, const QVariant & value, bool withType = false ); - void print( const QAMQP::Frame::TableField & f ); + void print( const Frame::TableField & f ); /* * @brief Base class for any frames. @@ -280,8 +279,6 @@ namespace Frame */ class Content : public Base { - friend class QAMQP::QueuePrivate; - public: /* * Default content frame property @@ -357,6 +354,9 @@ namespace Frame mutable QByteArray buffer_; QHash properties_; qlonglong bodySize_; + + private: + friend class QAMQP::QueuePrivate; }; class ContentBody : public Base @@ -398,22 +398,24 @@ namespace Frame class MethodHandler { public: - virtual void _q_method(const QAMQP::Frame::Method &frame) = 0; + virtual void _q_method(const Frame::Method &frame) = 0; }; class ContentHandler { public: - virtual void _q_content(const QAMQP::Frame::Content & frame) = 0; + virtual void _q_content(const Frame::Content & frame) = 0; }; class ContentBodyHandler { public: - virtual void _q_body(const QAMQP::Frame::ContentBody & frame) = 0; + virtual void _q_body(const Frame::ContentBody & frame) = 0; }; -} -} + +} // namespace Frame + +} // namespace QAMQP Q_DECLARE_METATYPE(QAMQP::Frame::decimal) Q_DECLARE_METATYPE(QAMQP::Frame::TableField) diff --git a/src/amqp_message.h b/src/amqp_message.h index 262c417..8a8e5c3 100644 --- a/src/amqp_message.h +++ b/src/amqp_message.h @@ -1,3 +1,6 @@ +#ifndef amqp_message_h__ +#define amqp_message_h__ + #include "amqp_frame.h" #include #include @@ -11,18 +14,20 @@ struct Message Message(); virtual ~Message(); - typedef QAMQP::Frame::Content::Property MessageProperty; + typedef Frame::Content::Property MessageProperty; Q_DECLARE_FLAGS(MessageProperties, MessageProperty) qlonglong deliveryTag; QByteArray payload; QHash property; - QAMQP::Frame::TableField headers; + Frame::TableField headers; QString routeKey; QString exchangeName; int leftSize; }; -typedef QSharedPointer MessagePtr; +typedef QSharedPointer MessagePtr; -} +} // namespace QAMQP + +#endif // amqp_message_h__ diff --git a/src/amqp_network.h b/src/amqp_network.h index 089932c..b7eab50 100644 --- a/src/amqp_network.h +++ b/src/amqp_network.h @@ -24,7 +24,7 @@ public: void disconnect(); void sendFrame(); - void sendFrame(const QAMQP::Frame::Base &frame); + void sendFrame(const Frame::Base &frame); bool isSsl() const; void setSsl(bool value); @@ -72,5 +72,6 @@ private: QHash > m_bodyHandlersByChannel; }; -} +} // namespace QAMQP + #endif // amqp_network_h__ diff --git a/src/amqp_queue.cpp b/src/amqp_queue.cpp index aab0786..1199913 100644 --- a/src/amqp_queue.cpp +++ b/src/amqp_queue.cpp @@ -3,7 +3,6 @@ #include "amqp_exchange.h" using namespace QAMQP; -using namespace QAMQP::Frame; #include #include @@ -13,7 +12,7 @@ using namespace QAMQP::Frame; Queue::Queue(int channelNumber, Client *parent) : Channel(new QueuePrivate(this), parent) { - Q_D(QAMQP::Queue); + Q_D(Queue); d->init(channelNumber, parent); } @@ -24,7 +23,7 @@ Queue::~Queue() void Queue::onOpen() { - Q_D(QAMQP::Queue); + Q_D(Queue); if (d->delayedDeclare) d->declare(); @@ -38,37 +37,37 @@ void Queue::onOpen() void Queue::onClose() { - Q_D(QAMQP::Queue); + Q_D(Queue); d->remove(true, true); } Queue::QueueOptions Queue::option() const { - Q_D(const QAMQP::Queue); + Q_D(const Queue); return d->options; } void Queue::setNoAck(bool noAck) { - Q_D(QAMQP::Queue); + Q_D(Queue); d->noAck = noAck; } bool Queue::noAck() const { - Q_D(const QAMQP::Queue); + Q_D(const Queue); return d->noAck; } void Queue::declare() { - Q_D(QAMQP::Queue); + Q_D(Queue); declare(d->name, QueueOptions(Durable | AutoDelete)); } void Queue::declare(const QString &name, QueueOptions options) { - Q_D(QAMQP::Queue); + Q_D(Queue); setName(name); d->options = options; d->declare(); @@ -76,63 +75,63 @@ void Queue::declare(const QString &name, QueueOptions options) void Queue::remove(bool ifUnused, bool ifEmpty, bool noWait) { - Q_D(QAMQP::Queue); + Q_D(Queue); d->remove(ifUnused, ifEmpty, noWait); } void Queue::purge() { - Q_D(QAMQP::Queue); + Q_D(Queue); d->purge(); } void Queue::bind(const QString &exchangeName, const QString &key) { - Q_D(QAMQP::Queue); + Q_D(Queue); d->bind(exchangeName, key); } void Queue::bind(Exchange *exchange, const QString &key) { - Q_D(QAMQP::Queue); + Q_D(Queue); if (exchange) d->bind(exchange->name(), key); } void Queue::unbind(const QString &exchangeName, const QString &key) { - Q_D(QAMQP::Queue); + Q_D(Queue); d->unbind(exchangeName, key); } void Queue::unbind(Exchange *exchange, const QString &key) { - Q_D(QAMQP::Queue); + Q_D(Queue); if (exchange) d->unbind(exchange->name(), key); } -void Queue::_q_content(const Content &frame) +void Queue::_q_content(const Frame::Content &frame) { - Q_D(QAMQP::Queue); + Q_D(Queue); d->_q_content(frame); } -void Queue::_q_body(const ContentBody &frame) +void Queue::_q_body(const Frame::ContentBody &frame) { - Q_D(QAMQP::Queue); + Q_D(Queue); d->_q_body(frame); } -QAMQP::MessagePtr Queue::getMessage() +MessagePtr Queue::getMessage() { - Q_D(QAMQP::Queue); + Q_D(Queue); return d->messages_.dequeue(); } bool Queue::hasMessage() const { - Q_D(const QAMQP::Queue); + Q_D(const Queue); if (d->messages_.isEmpty()) return false; @@ -142,31 +141,31 @@ bool Queue::hasMessage() const void Queue::consume(ConsumeOptions options) { - Q_D(QAMQP::Queue); + Q_D(Queue); d->consume(options); } void Queue::setConsumerTag(const QString &consumerTag) { - Q_D(QAMQP::Queue); + Q_D(Queue); d->consumerTag = consumerTag; } QString Queue::consumerTag() const { - Q_D(const QAMQP::Queue); + Q_D(const Queue); return d->consumerTag; } void Queue::get() { - Q_D(QAMQP::Queue); + Q_D(Queue); d->get(); } void Queue::ack(const MessagePtr &message) { - Q_D(QAMQP::Queue); + Q_D(Queue); d->ack(message); } @@ -186,13 +185,13 @@ QueuePrivate::~QueuePrivate() { } -bool QueuePrivate::_q_method(const QAMQP::Frame::Method &frame) +bool QueuePrivate::_q_method(const Frame::Method &frame) { - Q_Q(QAMQP::Queue); + Q_Q(Queue); if (ChannelPrivate::_q_method(frame)) return true; - if (frame.methodClass() == QAMQP::Frame::fcQueue) { + if (frame.methodClass() == Frame::fcQueue) { switch (frame.id()) { case miDeclareOk: declareOk(frame); @@ -216,7 +215,7 @@ bool QueuePrivate::_q_method(const QAMQP::Frame::Method &frame) return true; } - if (frame.methodClass() == QAMQP::Frame::fcBasic) { + if (frame.methodClass() == Frame::fcBasic) { switch(frame.id()) { case bmConsumeOk: consumeOk(frame); @@ -239,16 +238,16 @@ bool QueuePrivate::_q_method(const QAMQP::Frame::Method &frame) return false; } -void QueuePrivate::declareOk(const QAMQP::Frame::Method &frame) +void QueuePrivate::declareOk(const Frame::Method &frame) { - Q_Q(QAMQP::Queue); + Q_Q(Queue); qDebug() << "Declared queue: " << name; declared = true; QByteArray data = frame.arguments(); QDataStream stream(&data, QIODevice::ReadOnly); - name = readField('s', stream).toString(); + name = Frame::readField('s', stream).toString(); qint32 messageCount = 0, consumerCount = 0; stream >> messageCount >> consumerCount; qDebug("Message count %d\nConsumer count: %d", messageCount, consumerCount); @@ -256,9 +255,9 @@ void QueuePrivate::declareOk(const QAMQP::Frame::Method &frame) QMetaObject::invokeMethod(q, "declared"); } -void QueuePrivate::deleteOk(const QAMQP::Frame::Method &frame) +void QueuePrivate::deleteOk(const Frame::Method &frame) { - Q_Q(QAMQP::Queue); + Q_Q(Queue); qDebug() << "Deleted or purged queue: " << name; declared = false; @@ -270,19 +269,19 @@ void QueuePrivate::deleteOk(const QAMQP::Frame::Method &frame) QMetaObject::invokeMethod(q, "removed"); } -void QueuePrivate::bindOk(const QAMQP::Frame::Method &frame) +void QueuePrivate::bindOk(const Frame::Method &frame) { Q_UNUSED(frame) - Q_Q(QAMQP::Queue); + Q_Q(Queue); qDebug() << "Binded to queue: " << name; QMetaObject::invokeMethod(q, "binded", Q_ARG(bool, true)); } -void QueuePrivate::unbindOk(const QAMQP::Frame::Method &frame) +void QueuePrivate::unbindOk(const Frame::Method &frame) { Q_UNUSED(frame) - Q_Q(QAMQP::Queue); + Q_Q(Queue); qDebug() << "Unbinded queue: " << name; QMetaObject::invokeMethod(q, "binded", Q_ARG(bool, false)); @@ -295,14 +294,14 @@ void QueuePrivate::declare() return; } - QAMQP::Frame::Method frame(QAMQP::Frame::fcQueue, miDeclare); + Frame::Method frame(Frame::fcQueue, miDeclare); frame.setChannel(number); QByteArray arguments_; QDataStream out(&arguments_, QIODevice::WriteOnly); out << qint16(0); //reserver 1 - writeField('s', out, name); + Frame::writeField('s', out, name); out << qint8(options); - writeField('F', out, TableField()); + Frame::writeField('F', out, Frame::TableField()); frame.setArguments(arguments_); sendFrame(frame); @@ -314,13 +313,13 @@ void QueuePrivate::remove(bool ifUnused, bool ifEmpty, bool noWait) if (!declared) return; - QAMQP::Frame::Method frame(QAMQP::Frame::fcQueue, miDelete); + Frame::Method frame(Frame::fcQueue, miDelete); frame.setChannel(number); QByteArray arguments_; QDataStream out(&arguments_, QIODevice::WriteOnly); out << qint16(0); //reserver 1 - writeField('s', out, name); + Frame::writeField('s', out, name); qint8 flag = 0; @@ -339,12 +338,12 @@ void QueuePrivate::purge() if (!opened) return; - QAMQP::Frame::Method frame(QAMQP::Frame::fcQueue, miPurge); + Frame::Method frame(Frame::fcQueue, miPurge); frame.setChannel(number); QByteArray arguments_; QDataStream out(&arguments_, QIODevice::WriteOnly); out << qint16(0); //reserver 1 - writeField('s', out, name); + Frame::writeField('s', out, name); out << qint8(0); // no-wait frame.setArguments(arguments_); sendFrame(frame); @@ -357,16 +356,16 @@ void QueuePrivate::bind(const QString & exchangeName, const QString &key) return; } - QAMQP::Frame::Method frame(QAMQP::Frame::fcQueue, miBind); + Frame::Method frame(Frame::fcQueue, miBind); frame.setChannel(number); QByteArray arguments_; QDataStream out(&arguments_, QIODevice::WriteOnly); out << qint16(0); //reserver 1 - writeField('s', out, name); - writeField('s', out, exchangeName); - writeField('s', out, key); + Frame::writeField('s', out, name); + Frame::writeField('s', out, exchangeName); + Frame::writeField('s', out, key); out << qint8(0); // no-wait - writeField('F', out, TableField()); + Frame::writeField('F', out, Frame::TableField()); frame.setArguments(arguments_); sendFrame(frame); @@ -377,15 +376,15 @@ void QueuePrivate::unbind(const QString &exchangeName, const QString &key) if (!opened) return; - QAMQP::Frame::Method frame(QAMQP::Frame::fcQueue, miUnbind); + Frame::Method frame(Frame::fcQueue, miUnbind); frame.setChannel(number); QByteArray arguments_; QDataStream out(&arguments_, QIODevice::WriteOnly); out << qint16(0); //reserver 1 - writeField('s', out, name); - writeField('s', out, exchangeName); - writeField('s', out, key); - writeField('F', out, TableField()); + Frame::writeField('s', out, name); + Frame::writeField('s', out, exchangeName); + Frame::writeField('s', out, key); + Frame::writeField('F', out, Frame::TableField()); frame.setArguments(arguments_); sendFrame(frame); @@ -396,27 +395,27 @@ void QueuePrivate::get() if (!opened) return; - QAMQP::Frame::Method frame(QAMQP::Frame::fcBasic, bmGet); + Frame::Method frame(Frame::fcBasic, bmGet); frame.setChannel(number); QByteArray arguments_; QDataStream out(&arguments_, QIODevice::WriteOnly); out << qint16(0); //reserver 1 - writeField('s', out, name); + Frame::writeField('s', out, name); out << qint8(noAck ? 1 : 0); // noAck frame.setArguments(arguments_); sendFrame(frame); } -void QueuePrivate::getOk(const QAMQP::Frame::Method &frame) +void QueuePrivate::getOk(const Frame::Method &frame) { QByteArray data = frame.arguments(); QDataStream in(&data, QIODevice::ReadOnly); - qlonglong deliveryTag = readField('L',in).toLongLong(); - bool redelivered = readField('t',in).toBool(); - QString exchangeName = readField('s',in).toString(); - QString routingKey = readField('s',in).toString(); + qlonglong deliveryTag = Frame::readField('L',in).toLongLong(); + bool redelivered = Frame::readField('t',in).toBool(); + QString exchangeName = Frame::readField('s',in).toString(); + QString routingKey = Frame::readField('s',in).toString(); Q_UNUSED(redelivered) @@ -432,7 +431,7 @@ void QueuePrivate::ack(const MessagePtr &Message) if (!opened) return; - QAMQP::Frame::Method frame(QAMQP::Frame::fcBasic, bmAck); + Frame::Method frame(Frame::fcBasic, bmAck); frame.setChannel(number); QByteArray arguments_; QDataStream out(&arguments_, QIODevice::WriteOnly); @@ -448,43 +447,43 @@ void QueuePrivate::consume(Queue::ConsumeOptions options) if (!opened) return; - QAMQP::Frame::Method frame(QAMQP::Frame::fcBasic, bmConsume); + Frame::Method frame(Frame::fcBasic, bmConsume); frame.setChannel(number); QByteArray arguments_; QDataStream out(&arguments_, QIODevice::WriteOnly); out << qint16(0); //reserver 1 - writeField('s', out, name); - writeField('s', out, consumerTag); + Frame::writeField('s', out, name); + Frame::writeField('s', out, consumerTag); out << qint8(options); // no-wait - writeField('F', out, TableField()); + Frame::writeField('F', out, Frame::TableField()); frame.setArguments(arguments_); sendFrame(frame); } -void QueuePrivate::consumeOk(const QAMQP::Frame::Method &frame) +void QueuePrivate::consumeOk(const Frame::Method &frame) { qDebug() << "Consume ok: " << name; declared = false; QByteArray data = frame.arguments(); QDataStream stream(&data, QIODevice::ReadOnly); - consumerTag = readField('s',stream).toString(); + consumerTag = Frame::readField('s',stream).toString(); qDebug("Consumer tag = %s", qPrintable(consumerTag)); } -void QueuePrivate::deliver(const QAMQP::Frame::Method &frame) +void QueuePrivate::deliver(const Frame::Method &frame) { QByteArray data = frame.arguments(); QDataStream in(&data, QIODevice::ReadOnly); - QString consumer_ = readField('s',in).toString(); + QString consumer_ = Frame::readField('s',in).toString(); if (consumer_ != consumerTag) return; - qlonglong deliveryTag = readField('L',in).toLongLong(); - bool redelivered = readField('t',in).toBool(); - QString exchangeName = readField('s',in).toString(); - QString routingKey = readField('s',in).toString(); + qlonglong deliveryTag = Frame::readField('L',in).toLongLong(); + bool redelivered = Frame::readField('t',in).toBool(); + QString exchangeName = Frame::readField('s',in).toString(); + QString routingKey = Frame::readField('s',in).toString(); Q_UNUSED(redelivered) @@ -495,7 +494,7 @@ void QueuePrivate::deliver(const QAMQP::Frame::Method &frame) messages_.enqueue(newMessage); } -void QueuePrivate::_q_content(const QAMQP::Frame::Content &frame) +void QueuePrivate::_q_content(const Frame::Content &frame) { Q_ASSERT(frame.channel() == number); if (frame.channel() != number) @@ -513,9 +512,9 @@ void QueuePrivate::_q_content(const QAMQP::Frame::Content &frame) message->property[Message::MessageProperty(i.key())]= i.value(); } -void QueuePrivate::_q_body(const QAMQP::Frame::ContentBody &frame) +void QueuePrivate::_q_body(const Frame::ContentBody &frame) { - Q_Q(QAMQP::Queue); + Q_Q(Queue); Q_ASSERT(frame.channel() == number); if (frame.channel() != number) return; diff --git a/src/amqp_queue.h b/src/amqp_queue.h index 84b03c9..233f113 100644 --- a/src/amqp_queue.h +++ b/src/amqp_queue.h @@ -68,7 +68,7 @@ Q_SIGNALS: void declared(); void binded(bool); void removed(); - void messageReceived(QAMQP::Queue *pQueue); + void messageReceived(Queue *pQueue); void empty(); protected: @@ -78,18 +78,15 @@ protected: private: Queue(int channelNumber = -1, Client * parent = 0); - void _q_content(const QAMQP::Frame::Content & frame); - void _q_body(const QAMQP::Frame::ContentBody & frame); + void _q_content(const Frame::Content & frame); + void _q_body(const Frame::ContentBody & frame); Q_DISABLE_COPY(Queue) - Q_DECLARE_PRIVATE(QAMQP::Queue) + Q_DECLARE_PRIVATE(Queue) friend class ClientPrivate; - }; -} -#ifdef QAMQP_P_INCLUDE -# include "amqp_queue_p.h" +} // namespace QAMQP + #endif -#endif // amqp_queue_h__ diff --git a/src/amqp_queue_p.h b/src/amqp_queue_p.h index 2733e2e..1facf9e 100644 --- a/src/amqp_queue_p.h +++ b/src/amqp_queue_p.h @@ -1,15 +1,14 @@ #ifndef amqp_queue_p_h__ #define amqp_queue_p_h__ -#include "amqp_channel_p.h" -#define METHOD_ID_ENUM(name, id) name = id, name ## Ok #include +#include "amqp_channel_p.h" + +#define METHOD_ID_ENUM(name, id) name = id, name ## Ok namespace QAMQP { -using namespace QAMQP::Frame; - class QueuePrivate: public ChannelPrivate { public: @@ -30,27 +29,27 @@ public: void bind(const QString &exchangeName, const QString &key); void unbind(const QString &exchangeName, const QString &key); - void declareOk(const QAMQP::Frame::Method &frame); - void deleteOk(const QAMQP::Frame::Method &frame); - void bindOk(const QAMQP::Frame::Method &frame); - void unbindOk(const QAMQP::Frame::Method &frame); + void declareOk(const Frame::Method &frame); + void deleteOk(const Frame::Method &frame); + void bindOk(const Frame::Method &frame); + void unbindOk(const Frame::Method &frame); /************************************************************************/ /* CLASS BASIC METHODS */ /************************************************************************/ void consume(Queue::ConsumeOptions options); - void consumeOk(const QAMQP::Frame::Method &frame); - void deliver(const QAMQP::Frame::Method &frame); + void consumeOk(const Frame::Method &frame); + void deliver(const Frame::Method &frame); void get(); - void getOk(const QAMQP::Frame::Method &frame); + void getOk(const Frame::Method &frame); void ack(const MessagePtr &Message); QString type; Queue::QueueOptions options; - bool _q_method(const QAMQP::Frame::Method &frame); + bool _q_method(const Frame::Method &frame); bool delayedDeclare; bool declared; @@ -58,16 +57,17 @@ public: QString consumerTag; QQueue > delayedBindings; - QQueue messages_; + QQueue messages_; bool recievingMessage; - void _q_content(const QAMQP::Frame::Content &frame); - void _q_body(const QAMQP::Frame::ContentBody &frame); + void _q_content(const Frame::Content &frame); + void _q_body(const Frame::ContentBody &frame); - Q_DECLARE_PUBLIC(QAMQP::Queue) + Q_DECLARE_PUBLIC(Queue) }; -} +} // namespace QAMQP + #endif // amqp_queue_p_h__