namespace cleanup

This commit is contained in:
Matt Broadstone 2014-05-28 13:52:27 -04:00
parent cd33957147
commit 64f23fd648
20 changed files with 385 additions and 376 deletions

View File

@ -4,8 +4,6 @@
#include <QString>
#include <QDataStream>
#include "amqp_global.h"
namespace QAMQP
{
@ -38,5 +36,6 @@ private:
};
}
} // namespace QAMQP
#endif // amqp_authenticator_h__

View File

@ -1,11 +1,9 @@
#include "amqp_channel.h"
#include "amqp_channel_p.h"
#include "amqp_connection_p.h"
#include "amqp_client.h"
#include "amqp_client_p.h"
#include "amqp_connection_p.h"
#include <QCoreApplication>
#include <QDebug>
#include <QDataStream>
@ -13,59 +11,58 @@ using namespace QAMQP;
//////////////////////////////////////////////////////////////////////////
QAMQP::Channel::Channel(int channelNumber, Client *parent)
Channel::Channel(int channelNumber, Client *parent)
: QObject(parent),
d_ptr(new ChannelPrivate(this))
{
Q_D(QAMQP::Channel);
Q_D(Channel);
d->init(channelNumber, parent);
}
QAMQP::Channel::Channel(ChannelPrivate *dd, Client *parent)
Channel::Channel(ChannelPrivate *dd, Client *parent)
: QObject(parent),
d_ptr(dd)
{
}
QAMQP::Channel::~Channel()
Channel::~Channel()
{
}
void QAMQP::Channel::closeChannel()
void Channel::closeChannel()
{
Q_D(QAMQP::Channel);
Q_D(Channel);
d->needOpen = true;
if (d->opened)
d->close(0, QString(), 0,0);
}
void QAMQP::Channel::reopen()
void Channel::reopen()
{
Q_D(QAMQP::Channel);
Q_D(Channel);
closeChannel();
d->open();
}
QString QAMQP::Channel::name() const
QString Channel::name() const
{
Q_D(const QAMQP::Channel);
Q_D(const Channel);
return d->name;
}
int QAMQP::Channel::channelNumber() const
int Channel::channelNumber() const
{
Q_D(const QAMQP::Channel);
Q_D(const Channel);
return d->number;
}
void QAMQP::Channel::setName(const QString &name)
void Channel::setName(const QString &name)
{
Q_D(QAMQP::Channel);
Q_D(Channel);
d->name = name;
}
void QAMQP::Channel::stateChanged(int state)
void Channel::stateChanged(int state)
{
switch(ChannelPrivate::State(state)) {
case ChannelPrivate::csOpened:
@ -83,29 +80,29 @@ void QAMQP::Channel::stateChanged(int state)
}
}
void QAMQP::Channel::_q_method(const Frame::Method &frame)
void Channel::_q_method(const Frame::Method &frame)
{
Q_D(QAMQP::Channel);
Q_D(Channel);
d->_q_method(frame);
}
bool QAMQP::Channel::isOpened() const
bool Channel::isOpened() const
{
Q_D(const QAMQP::Channel);
Q_D(const Channel);
return d->opened;
}
void QAMQP::Channel::onOpen()
void Channel::onOpen()
{
}
void QAMQP::Channel::onClose()
void Channel::onClose()
{
}
void QAMQP::Channel::setQOS(qint32 prefetchSize, quint16 prefetchCount)
void Channel::setQOS(qint32 prefetchSize, quint16 prefetchCount)
{
Q_D(QAMQP::Channel);
Q_D(Channel);
d->setQOS(prefetchSize, prefetchCount);
}
@ -126,7 +123,7 @@ ChannelPrivate::~ChannelPrivate()
void ChannelPrivate::init(int channelNumber, Client *parent)
{
Q_Q(QAMQP::Channel);
Q_Q(Channel);
needOpen = channelNumber == -1 ? true : false;
number = channelNumber == -1 ? ++nextChannelNumber_ : channelNumber;
nextChannelNumber_ = qMax(channelNumber, (nextChannelNumber_ + 1));
@ -134,14 +131,13 @@ void ChannelPrivate::init(int channelNumber, Client *parent)
client_ = parent;
}
bool ChannelPrivate::_q_method(const QAMQP::Frame::Method &frame)
bool ChannelPrivate::_q_method(const Frame::Method &frame)
{
Q_ASSERT(frame.channel() == number);
if (frame.channel() != number)
return true;
if (frame.methodClass() != QAMQP::Frame::fcChannel)
if (frame.methodClass() != Frame::fcChannel)
return false;
qDebug("Channel#%d:", number);
@ -171,12 +167,10 @@ void ChannelPrivate::_q_open()
open();
}
void ChannelPrivate::sendFrame(const QAMQP::Frame::Base &frame)
void ChannelPrivate::sendFrame(const Frame::Base &frame)
{
if (client_) {
if (client_)
client_->d_func()->network_->sendFrame(frame);
}
}
void ChannelPrivate::open()
@ -188,7 +182,7 @@ void ChannelPrivate::open()
return;
qDebug("Open channel #%d", number);
QAMQP::Frame::Method frame(QAMQP::Frame::fcChannel, miOpen);
Frame::Method frame(Frame::fcChannel, miOpen);
frame.setChannel(number);
QByteArray arguments_;
arguments_.resize(1);
@ -201,7 +195,7 @@ void ChannelPrivate::flow()
{
}
void ChannelPrivate::flow(const QAMQP::Frame::Method &frame)
void ChannelPrivate::flow(const Frame::Method &frame)
{
Q_UNUSED(frame);
}
@ -210,21 +204,21 @@ void ChannelPrivate::flowOk()
{
}
void ChannelPrivate::flowOk(const QAMQP::Frame::Method &frame)
void ChannelPrivate::flowOk(const Frame::Method &frame)
{
Q_UNUSED(frame);
}
void ChannelPrivate::close(int code, const QString &text, int classId, int methodId)
{
QAMQP::Frame::Method frame(QAMQP::Frame::fcChannel, miClose);
Frame::Method frame(Frame::fcChannel, miClose);
QByteArray arguments_;
QDataStream stream(&arguments_, QIODevice::WriteOnly);
QAMQP::Frame::writeField('s',stream, client_->virtualHost());
Frame::writeField('s',stream, client_->virtualHost());
stream << qint16(code);
QAMQP::Frame::writeField('s', stream, text);
Frame::writeField('s', stream, text);
stream << qint16(classId);
stream << qint16(methodId);
@ -232,9 +226,9 @@ void ChannelPrivate::close(int code, const QString &text, int classId, int metho
client_->d_func()->network_->sendFrame(frame);
}
void ChannelPrivate::close(const QAMQP::Frame::Method &frame)
void ChannelPrivate::close(const Frame::Method &frame)
{
Q_Q(QAMQP::Channel);
Q_Q(Channel);
q->stateChanged(csClosed);
qDebug(">> CLOSE");
@ -242,7 +236,7 @@ void ChannelPrivate::close(const QAMQP::Frame::Method &frame)
QDataStream stream(&data, QIODevice::ReadOnly);
qint16 code_ = 0, classId, methodId;
stream >> code_;
QString text(QAMQP::Frame::readField('s', stream).toString());
QString text(Frame::readField('s', stream).toString());
stream >> classId;
stream >> methodId;
@ -254,24 +248,24 @@ void ChannelPrivate::close(const QAMQP::Frame::Method &frame)
void ChannelPrivate::closeOk()
{
QAMQP::Frame::Method frame(QAMQP::Frame::fcChannel, miCloseOk);
Frame::Method frame(Frame::fcChannel, miCloseOk);
sendFrame(frame);
}
void ChannelPrivate::closeOk(const QAMQP::Frame::Method &frame)
void ChannelPrivate::closeOk(const Frame::Method &frame)
{
Q_UNUSED(frame)
Q_Q(QAMQP::Channel);
Q_Q(Channel);
q->stateChanged(csClosed);
q->onClose();
opened = false;
}
void ChannelPrivate::openOk(const QAMQP::Frame::Method &frame)
void ChannelPrivate::openOk(const Frame::Method &frame)
{
Q_UNUSED(frame)
Q_Q(QAMQP::Channel);
Q_Q(Channel);
qDebug(">> OpenOK");
opened = true;

View File

@ -2,7 +2,6 @@
#define amqp_channel_h__
#include <QObject>
#include "amqp_global.h"
#include "amqp_frame.h"
namespace QAMQP
@ -13,46 +12,48 @@ class ChannelPrivate;
class Channel : public QObject, public Frame::MethodHandler
{
Q_OBJECT
Q_PROPERTY(int number READ channelNumber)
Q_PROPERTY(int number READ channelNumber CONSTANT)
Q_PROPERTY(bool opened READ isOpened CONSTANT)
Q_PROPERTY(QString name READ name WRITE setName)
public:
~Channel();
virtual ~Channel();
void closeChannel();
void reopen();
QString name() const;
int channelNumber() const;
void setName(const QString &name);
void setQOS(qint32 prefetchSize, quint16 prefetchCount);
bool isOpened() const;
signals:
QString name() const;
void setName(const QString &name);
public Q_SLOTS:
void closeChannel();
void reopen();
void setQOS(qint32 prefetchSize, quint16 prefetchCount);
Q_SIGNALS:
void opened();
void closed();
void flowChanged(bool enabled);
protected:
Q_DISABLE_COPY(Channel)
Q_DECLARE_PRIVATE(QAMQP::Channel)
Channel(int channelNumber = -1, Client *parent = 0);
Channel(ChannelPrivate *dd, Client *parent = 0);
Q_DISABLE_COPY(Channel)
Q_DECLARE_PRIVATE(Channel)
QScopedPointer<ChannelPrivate> d_ptr;
Q_PRIVATE_SLOT(d_func(), void _q_open())
Q_PRIVATE_SLOT(d_func(), void _q_disconnected())
// should move to private classes
virtual void onOpen();
virtual void onClose();
void stateChanged(int state);
void _q_method(const QAMQP::Frame::Method &frame);
void _q_method(const Frame::Method &frame);
friend class ClientPrivate;
};
}
} // namespace QAMQP
#endif

View File

@ -1,7 +1,6 @@
#ifndef amqp_channel_p_h__
#define amqp_channel_p_h__
#include "amqp_global.h"
#include <QPointer>
#define METHOD_ID_ENUM(name, id) name = id, name ## Ok
@ -81,5 +80,6 @@ public:
Channel * const q_ptr;
};
}
} // namespace QAMQP
#endif // amqp_channel_p_h__

View File

@ -1,8 +1,6 @@
#include "amqp_client.h"
#include "amqp_client_p.h"
#include <QTextStream>
#include <QCoreApplication>
#include "amqp_global.h"
#include "amqp_exchange.h"
#include "amqp_exchange_p.h"
#include "amqp_queue.h"
@ -10,6 +8,8 @@
#include "amqp_connection_p.h"
#include "amqp_authenticator.h"
#include <QTextStream>
using namespace QAMQP;
ClientPrivate::ClientPrivate(Client * q)

View File

@ -4,8 +4,6 @@
#include <QObject>
#include <QUrl>
#include "amqp_global.h"
namespace QAMQP
{

View File

@ -3,7 +3,6 @@
#include <QSharedPointer>
#include "amqp_global.h"
#include "amqp_network.h"
#include "amqp_connection.h"
#include "amqp_authenticator.h"
@ -47,5 +46,6 @@ public:
};
}
} // namespace QAMQP
#endif // amqp_client_p_h__

View File

@ -3,11 +3,10 @@
#include "amqp_client.h"
#include "amqp_client_p.h"
#include "amqp_frame.h"
#include "amqp_global.h"
#include <QCoreApplication>
#include <QDebug>
#include <QDataStream>
#include <QVariant>
#include <QTimer>
using namespace QAMQP;
@ -25,7 +24,7 @@ ConnectionPrivate::~ConnectionPrivate()
void ConnectionPrivate::init(Client * parent)
{
Q_Q(QAMQP::Connection);
Q_Q(Connection);
q->setParent(parent);
client_ = parent;
heartbeatTimer_ = new QTimer(parent);
@ -34,19 +33,19 @@ void ConnectionPrivate::init(Client * parent)
void ConnectionPrivate::startOk()
{
QAMQP::Frame::Method frame(QAMQP::Frame::fcConnection, miStartOk);
Frame::Method frame(Frame::fcConnection, miStartOk);
QByteArray arguments_;
QDataStream stream(&arguments_, QIODevice::WriteOnly);
QAMQP::Frame::TableField clientProperties;
Frame::TableField clientProperties;
clientProperties["version"] = QString(QAMQP_VERSION);
clientProperties["platform"] = QString("Qt %1").arg(qVersion());
clientProperties["product"] = QString("QAMQP");
clientProperties.unite(customProperty);
QAMQP::Frame::serialize(stream, clientProperties);
Frame::serialize(stream, clientProperties);
client_->d_func()->auth_->write(stream);
QAMQP::Frame::writeField('s', stream, "en_US");
Frame::writeField('s', stream, "en_US");
frame.setArguments(arguments_);
client_->d_func()->network_->sendFrame(frame);
}
@ -57,7 +56,7 @@ void ConnectionPrivate::secureOk()
void ConnectionPrivate::tuneOk()
{
QAMQP::Frame::Method frame(QAMQP::Frame::fcConnection, miTuneOk);
Frame::Method frame(Frame::fcConnection, miTuneOk);
QByteArray arguments_;
QDataStream stream(&arguments_, QIODevice::WriteOnly);
@ -71,11 +70,11 @@ void ConnectionPrivate::tuneOk()
void ConnectionPrivate::open()
{
QAMQP::Frame::Method frame(QAMQP::Frame::fcConnection, miOpen);
Frame::Method frame(Frame::fcConnection, miOpen);
QByteArray arguments_;
QDataStream stream(&arguments_, QIODevice::WriteOnly);
QAMQP::Frame::writeField('s',stream, client_->virtualHost());
Frame::writeField('s',stream, client_->virtualHost());
stream << qint8(0);
stream << qint8(0);
@ -84,7 +83,7 @@ void ConnectionPrivate::open()
client_->d_func()->network_->sendFrame(frame);
}
void ConnectionPrivate::start(const QAMQP::Frame::Method &frame)
void ConnectionPrivate::start(const Frame::Method &frame)
{
qDebug(">> Start");
QByteArray data = frame.arguments();
@ -94,16 +93,16 @@ void ConnectionPrivate::start(const QAMQP::Frame::Method &frame)
stream >> version_major >> version_minor;
QAMQP::Frame::TableField table;
QAMQP::Frame::deserialize(stream, table);
Frame::TableField table;
Frame::deserialize(stream, table);
QString mechanisms = QAMQP::Frame::readField('S', stream).toString();
QString locales = QAMQP::Frame::readField('S', stream).toString();
QString mechanisms = Frame::readField('S', stream).toString();
QString locales = Frame::readField('S', stream).toString();
qDebug(">> version_major: %d", version_major);
qDebug(">> version_minor: %d", version_minor);
QAMQP::Frame::print(table);
Frame::print(table);
qDebug(">> mechanisms: %s", qPrintable(mechanisms));
qDebug(">> locales: %s", qPrintable(locales));
@ -111,19 +110,19 @@ void ConnectionPrivate::start(const QAMQP::Frame::Method &frame)
startOk();
}
void ConnectionPrivate::secure(const QAMQP::Frame::Method &frame)
void ConnectionPrivate::secure(const Frame::Method &frame)
{
Q_UNUSED(frame)
}
void ConnectionPrivate::tune(const QAMQP::Frame::Method &frame)
void ConnectionPrivate::tune(const Frame::Method &frame)
{
qDebug(">> Tune");
QByteArray data = frame.arguments();
QDataStream stream(&data, QIODevice::ReadOnly);
qint16 channel_max = 0,
heartbeat = 0;
heartbeat = 0;
qint32 frame_max = 0;
stream >> channel_max;
@ -146,26 +145,26 @@ void ConnectionPrivate::tune(const QAMQP::Frame::Method &frame)
open();
}
void ConnectionPrivate::openOk(const QAMQP::Frame::Method &frame)
void ConnectionPrivate::openOk(const Frame::Method &frame)
{
Q_UNUSED(frame)
Q_Q(QAMQP::Connection);
Q_Q(Connection);
qDebug(">> OpenOK");
connected = true;
q->openOk();
}
void ConnectionPrivate::close(const QAMQP::Frame::Method &frame)
void ConnectionPrivate::close(const Frame::Method &frame)
{
Q_Q(QAMQP::Connection);
Q_Q(Connection);
qDebug(">> CLOSE");
QByteArray data = frame.arguments();
QDataStream stream(&data, QIODevice::ReadOnly);
qint16 code_ = 0, classId, methodId;
stream >> code_;
QString text(QAMQP::Frame::readField('s', stream).toString());
QString text(Frame::readField('s', stream).toString());
stream >> classId;
stream >> methodId;
@ -180,14 +179,14 @@ void ConnectionPrivate::close(const QAMQP::Frame::Method &frame)
void ConnectionPrivate::close(int code, const QString &text, int classId, int methodId)
{
QAMQP::Frame::Method frame(QAMQP::Frame::fcConnection, miClose);
Frame::Method frame(Frame::fcConnection, miClose);
QByteArray arguments_;
QDataStream stream(&arguments_, QIODevice::WriteOnly);
QAMQP::Frame::writeField('s',stream, client_->virtualHost());
Frame::writeField('s',stream, client_->virtualHost());
stream << qint16(code);
QAMQP::Frame::writeField('s', stream, text);
Frame::writeField('s', stream, text);
stream << qint16(classId);
stream << qint16(methodId);
@ -197,16 +196,15 @@ void ConnectionPrivate::close(int code, const QString &text, int classId, int me
void ConnectionPrivate::closeOk()
{
QAMQP::Frame::Method frame(QAMQP::Frame::fcConnection, miCloseOk);
Frame::Method frame(Frame::fcConnection, miCloseOk);
connected = false;
client_->d_func()->network_->sendFrame(frame);
}
void ConnectionPrivate::closeOk(const QAMQP::Frame::Method &frame)
void ConnectionPrivate::closeOk(const Frame::Method &frame)
{
Q_UNUSED(frame)
Q_Q(QAMQP::Connection);
Q_Q(Connection);
connected = false;
QMetaObject::invokeMethod(q, "disconnected");
@ -214,10 +212,9 @@ void ConnectionPrivate::closeOk(const QAMQP::Frame::Method &frame)
heartbeatTimer_->stop();
}
void ConnectionPrivate::setQOS(qint32 prefetchSize, quint16 prefetchCount, int channel, bool global)
{
QAMQP::Frame::Method frame(QAMQP::Frame::fcBasic, 10);
Frame::Method frame(Frame::fcBasic, 10);
frame.setChannel(channel);
QByteArray arguments_;
QDataStream out(&arguments_, QIODevice::WriteOnly);
@ -230,10 +227,10 @@ void ConnectionPrivate::setQOS(qint32 prefetchSize, quint16 prefetchCount, int c
client_->d_func()->network_->sendFrame(frame);
}
bool ConnectionPrivate::_q_method(const QAMQP::Frame::Method &frame)
bool ConnectionPrivate::_q_method(const Frame::Method &frame)
{
Q_ASSERT(frame.methodClass() == QAMQP::Frame::fcConnection);
if (frame.methodClass() != QAMQP::Frame::fcConnection)
Q_ASSERT(frame.methodClass() == Frame::fcConnection);
if (frame.methodClass() != Frame::fcConnection)
return true;
qDebug() << "Connection:";
@ -274,7 +271,7 @@ bool ConnectionPrivate::_q_method(const QAMQP::Frame::Method &frame)
void ConnectionPrivate::_q_heartbeat()
{
QAMQP::Frame::Heartbeat frame;
Frame::Heartbeat frame;
client_->d_func()->network_->sendFrame(frame);
}
@ -284,7 +281,7 @@ Connection::Connection(Client *parent)
: QObject(parent),
d_ptr(new ConnectionPrivate(this))
{
Q_D(QAMQP::Connection);
Q_D(Connection);
d->init(parent);
}
@ -294,37 +291,37 @@ Connection::~Connection()
void Connection::startOk()
{
Q_D(QAMQP::Connection);
Q_D(Connection);
d->startOk();
}
void Connection::secureOk()
{
Q_D(QAMQP::Connection);
Q_D(Connection);
d->secureOk();
}
void Connection::tuneOk()
{
Q_D(QAMQP::Connection);
Q_D(Connection);
d->tuneOk();
}
void Connection::open()
{
Q_D(QAMQP::Connection);
Q_D(Connection);
d->open();
}
void Connection::close(int code, const QString &text, int classId , int methodId)
{
Q_D(QAMQP::Connection);
Q_D(Connection);
d->close(code, text, classId, methodId);
}
void Connection::closeOk()
{
Q_D(QAMQP::Connection);
Q_D(Connection);
d->closeOk();
Q_EMIT disconnect();
}
@ -334,33 +331,33 @@ void Connection::openOk()
Q_EMIT connected();
}
void Connection::_q_method(const QAMQP::Frame::Method &frame)
void Connection::_q_method(const Frame::Method &frame)
{
Q_D(QAMQP::Connection);
Q_D(Connection);
d->_q_method(frame);
}
bool Connection::isConnected() const
{
Q_D(const QAMQP::Connection);
Q_D(const Connection);
return d->connected;
}
void Connection::setQOS(qint32 prefetchSize, quint16 prefetchCount)
{
Q_D(QAMQP::Connection);
Q_D(Connection);
d->setQOS(prefetchSize, prefetchCount, 0, true);
}
void Connection::addCustomProperty(const QString &name, const QString &value)
{
Q_D(QAMQP::Connection);
Q_D(Connection);
d->customProperty[name] = value;
}
QString Connection::customProperty(const QString &name) const
{
Q_D(const QAMQP::Connection);
Q_D(const Connection);
if (d->customProperty.contains(name))
return d->customProperty.value(name).toString();
return QString();

View File

@ -3,7 +3,6 @@
#include <QObject>
#include "amqp_frame.h"
#include "amqp_global.h"
namespace QAMQP
{
@ -15,12 +14,17 @@ class ConnectionPrivate;
class Connection : public QObject, public Frame::MethodHandler
{
Q_OBJECT
Q_PROPERTY(bool connected READ isConnected CONSTANT)
public:
virtual ~Connection();
void addCustomProperty(const QString &name, const QString &value);
QString customProperty(const QString &name) const;
bool isConnected() const;
public Q_SLOTS:
void startOk();
void secureOk();
void tuneOk();
@ -28,8 +32,6 @@ public:
void close(int code, const QString &text, int classId = 0, int methodId = 0);
void closeOk();
bool isConnected() const;
void setQOS(qint32 prefetchSize, quint16 prefetchCount);
Q_SIGNALS:
@ -37,20 +39,21 @@ Q_SIGNALS:
void connected();
private:
explicit Connection(Client * parent = 0);
Q_DISABLE_COPY(Connection)
Q_DECLARE_PRIVATE(Connection)
QScopedPointer<ConnectionPrivate> d_ptr;
Connection(Client * parent = 0);
void openOk();
Q_PRIVATE_SLOT(d_func(), void _q_heartbeat())
friend class ClientPrivate;
friend class ChannelPrivate;
void _q_method(const QAMQP::Frame::Method &frame);
Q_PRIVATE_SLOT(d_func(), void _q_heartbeat())
// should be moved to private
void openOk();
void _q_method(const Frame::Method &frame);
};
}
} // namespace QAMQP
#endif // amqp_connection_h__

View File

@ -1,14 +1,14 @@
#ifndef amqp_connection_p_h__
#define amqp_connection_p_h__
#define METHOD_ID_ENUM(name, id) name = id, name ## Ok
#include <QPointer>
class QTimer;
#define METHOD_ID_ENUM(name, id) name = id, name ## Ok
class QTimer;
namespace QAMQP
{
class Client;
class ClientPrivate;
class Connection;
@ -34,14 +34,14 @@ public:
void close(int code, const QString & text, int classId = 0, int methodId = 0);
void closeOk();
void start(const QAMQP::Frame::Method &frame);
void secure(const QAMQP::Frame::Method &frame);
void tune(const QAMQP::Frame::Method &frame);
void openOk(const QAMQP::Frame::Method &frame);
void close(const QAMQP::Frame::Method &frame);
void closeOk(const QAMQP::Frame::Method &frame);
void start(const Frame::Method &frame);
void secure(const Frame::Method &frame);
void tune(const Frame::Method &frame);
void openOk(const Frame::Method &frame);
void close(const Frame::Method &frame);
void closeOk(const Frame::Method &frame);
bool _q_method(const QAMQP::Frame::Method &frame);
bool _q_method(const Frame::Method &frame);
void _q_heartbeat();
void setQOS(qint32 prefetchSize, quint16 prefetchCount, int channel, bool global);
@ -51,12 +51,12 @@ public:
bool connected;
QPointer<QTimer> heartbeatTimer_;
QAMQP::Frame::TableField customProperty;
Frame::TableField customProperty;
Q_DECLARE_PUBLIC(QAMQP::Connection)
Q_DECLARE_PUBLIC(Connection)
Connection * const q_ptr;
};
}
} // namespace QAMQP
#endif // amqp_connection_p_h__

View File

@ -1,18 +1,17 @@
#include "amqp_exchange.h"
#include "amqp_exchange_p.h"
#include "amqp_queue.h"
#include "amqp_global.h"
#include <QDataStream>
#include <QDebug>
using namespace QAMQP;
using namespace QAMQP::Frame;
#include <QCoreApplication>
#include <QDebug>
#include <QDataStream>
Exchange::Exchange(int channelNumber, Client *parent)
: Channel(new ExchangePrivate(this), parent)
{
Q_D(QAMQP::Exchange);
Q_D(Exchange);
d->init(channelNumber, parent);
}
@ -23,32 +22,32 @@ Exchange::~Exchange()
void Exchange::onOpen()
{
Q_D(QAMQP::Exchange);
Q_D(Exchange);
if (d->delayedDeclare)
d->declare();
}
void Exchange::onClose()
{
Q_D(QAMQP::Exchange);
Q_D(Exchange);
d->remove(true, true);
}
Exchange::ExchangeOptions Exchange::option() const
{
Q_D(const QAMQP::Exchange);
Q_D(const Exchange);
return d->options;
}
QString Exchange::type() const
{
Q_D(const QAMQP::Exchange);
Q_D(const Exchange);
return d->type;
}
void Exchange::declare(const QString &type, ExchangeOptions option , const TableField &arg)
void Exchange::declare(const QString &type, ExchangeOptions option , const Frame::TableField &arg)
{
Q_D(QAMQP::Exchange);
Q_D(Exchange);
d->options = option;
d->type = type;
d->arguments = arg;
@ -57,13 +56,13 @@ void Exchange::declare(const QString &type, ExchangeOptions option , const Table
void Exchange::remove(bool ifUnused, bool noWait)
{
Q_D(QAMQP::Exchange);
Q_D(Exchange);
d->remove(ifUnused, noWait);
}
void Exchange::bind(QAMQP::Queue *queue)
void Exchange::bind(Queue *queue)
{
Q_D(QAMQP::Exchange);
Q_D(Exchange);
queue->bind(this, d->name);
}
@ -82,14 +81,14 @@ void Exchange::bind(const QString &queueName, const QString &key)
void Exchange::publish(const QString &message, const QString &key, const MessageProperties &prop)
{
Q_D(QAMQP::Exchange);
Q_D(Exchange);
d->publish(message.toUtf8(), key, QLatin1String("text.plain"), QVariantHash(), prop);
}
void Exchange::publish(const QByteArray &message, const QString &key,
const QString &mimeType, const MessageProperties &prop)
{
Q_D(QAMQP::Exchange);
Q_D(Exchange);
d->publish(message, key, mimeType, QVariantHash(), prop);
}
@ -97,7 +96,7 @@ void Exchange::publish(const QByteArray &message, const QString &key,
const QVariantHash &headers, const QString &mimeType,
const MessageProperties &prop)
{
Q_D(QAMQP::Exchange);
Q_D(Exchange);
d->publish(message, key, mimeType, headers, prop);
}
@ -114,12 +113,12 @@ ExchangePrivate::~ExchangePrivate()
{
}
bool ExchangePrivate::_q_method(const QAMQP::Frame::Method &frame)
bool ExchangePrivate::_q_method(const Frame::Method &frame)
{
if (ChannelPrivate::_q_method(frame))
return true;
if (frame.methodClass() != QAMQP::Frame::fcExchange)
if (frame.methodClass() != Frame::fcExchange)
return false;
switch(frame.id()) {
@ -136,19 +135,19 @@ bool ExchangePrivate::_q_method(const QAMQP::Frame::Method &frame)
return true;
}
void ExchangePrivate::declareOk(const QAMQP::Frame::Method &frame)
void ExchangePrivate::declareOk(const Frame::Method &frame)
{
Q_UNUSED(frame)
Q_Q(QAMQP::Exchange);
Q_Q(Exchange);
qDebug() << "Declared exchange: " << name;
declared = true;
QMetaObject::invokeMethod(q, "declared");
}
void ExchangePrivate::deleteOk(const QAMQP::Frame::Method &frame)
void ExchangePrivate::deleteOk(const Frame::Method &frame)
{
Q_UNUSED(frame)
Q_Q(QAMQP::Exchange);
Q_Q(Exchange);
qDebug() << "Deleted exchange: " << name;
declared = false;
QMetaObject::invokeMethod(q, "removed");
@ -164,16 +163,16 @@ void ExchangePrivate::declare()
if (name.isEmpty())
return;
QAMQP::Frame::Method frame(QAMQP::Frame::fcExchange, miDeclare);
Frame::Method frame(Frame::fcExchange, miDeclare);
frame.setChannel(number);
QByteArray arguments_;
QDataStream stream(&arguments_, QIODevice::WriteOnly);
stream << qint16(0); //reserver 1
writeField('s', stream, name);
writeField('s', stream, type);
Frame::writeField('s', stream, name);
Frame::writeField('s', stream, type);
stream << qint8(options);
writeField('F', stream, ExchangePrivate::arguments);
Frame::writeField('F', stream, ExchangePrivate::arguments);
frame.setArguments(arguments_);
sendFrame(frame);
@ -182,13 +181,13 @@ void ExchangePrivate::declare()
void ExchangePrivate::remove(bool ifUnused, bool noWait)
{
QAMQP::Frame::Method frame(QAMQP::Frame::fcExchange, miDelete);
Frame::Method frame(Frame::fcExchange, miDelete);
frame.setChannel(number);
QByteArray arguments_;
QDataStream stream(&arguments_, QIODevice::WriteOnly);
stream << qint16(0); //reserver 1
writeField('s', stream, name);
Frame::writeField('s', stream, name);
qint8 flag = 0;
@ -205,25 +204,25 @@ void ExchangePrivate::publish(const QByteArray &message, const QString &key,
const QString &mimeType, const QVariantHash & headers,
const Exchange::MessageProperties &prop)
{
QAMQP::Frame::Method frame(QAMQP::Frame::fcBasic, bmPublish);
Frame::Method frame(Frame::fcBasic, bmPublish);
frame.setChannel(number);
QByteArray arguments_;
QDataStream out(&arguments_, QIODevice::WriteOnly);
out << qint16(0); //reserver 1
writeField('s', out, name);
writeField('s', out, key);
Frame::writeField('s', out, name);
Frame::writeField('s', out, key);
out << qint8(0);
frame.setArguments(arguments_);
sendFrame(frame);
QAMQP::Frame::Content content(QAMQP::Frame::fcBasic);
Frame::Content content(Frame::fcBasic);
content.setChannel(number);
content.setProperty(Content::cpContentType, mimeType);
content.setProperty(Content::cpContentEncoding, "utf-8");
content.setProperty(Content::cpHeaders, headers);
content.setProperty(Content::cpMessageId, "0");
content.setProperty(Frame::Content::cpContentType, mimeType);
content.setProperty(Frame::Content::cpContentEncoding, "utf-8");
content.setProperty(Frame::Content::cpHeaders, headers);
content.setProperty(Frame::Content::cpMessageId, "0");
Exchange::MessageProperties::ConstIterator i;
@ -235,7 +234,7 @@ void ExchangePrivate::publish(const QByteArray &message, const QString &key,
int fullSize = message.size();
for (int sended_ = 0; sended_ < fullSize; sended_+= (FRAME_MAX - 7)) {
QAMQP::Frame::ContentBody body;
Frame::ContentBody body;
QByteArray partition_ = message.mid(sended_, (FRAME_MAX - 7));
body.setChannel(number);
body.setBody(partition_);

View File

@ -6,8 +6,6 @@
namespace QAMQP
{
using namespace QAMQP::Frame;
class Client;
class Queue;
class ClientPrivate;
@ -30,7 +28,7 @@ public:
};
Q_DECLARE_FLAGS(ExchangeOptions, ExchangeOption)
typedef QHash<QAMQP::Frame::Content::Property, QVariant> MessageProperties;
typedef QHash<Frame::Content::Property, QVariant> MessageProperties;
virtual ~Exchange();
@ -39,10 +37,10 @@ public:
void declare(const QString &type = QLatin1String("direct"),
ExchangeOptions option = NoOptions,
const TableField &arg = TableField());
const Frame::TableField &arg = Frame::TableField());
void remove(bool ifUnused = true, bool noWait = true);
void bind(QAMQP::Queue *queue);
void bind(Queue *queue);
void bind(const QString &queueName);
void bind(const QString &queueName, const QString &key);
@ -62,14 +60,15 @@ protected:
void onClose();
private:
explicit Exchange(int channelNumber = -1, Client * parent = 0);
Q_DISABLE_COPY(Exchange)
Q_DECLARE_PRIVATE(QAMQP::Exchange)
Exchange(int channelNumber = -1, Client * parent = 0);
Q_DECLARE_PRIVATE(Exchange)
friend class ClientPrivate;
};
}
} // namespace QAMQP
Q_DECLARE_OPERATORS_FOR_FLAGS(QAMQP::Exchange::ExchangeOptions)

View File

@ -1,3 +1,6 @@
#ifndef amqp_exchange_p_h__
#define amqp_exchange_p_h__
#include "amqp_channel_p.h"
#define METHOD_ID_ENUM(name, id) name = id, name ## Ok
@ -5,7 +8,6 @@
namespace QAMQP
{
using namespace QAMQP::Frame;
class ExchangePrivate: public ChannelPrivate
{
public:
@ -20,8 +22,8 @@ public:
void declare();
void remove(bool ifUnused = true, bool noWait = true);
void declareOk(const QAMQP::Frame::Method &frame);
void deleteOk(const QAMQP::Frame::Method &frame);
void declareOk(const Frame::Method &frame);
void deleteOk(const Frame::Method &frame);
void publish(const QByteArray &message, const QString &key,
const QString &mimeType = QLatin1String("text/plain"),
@ -30,15 +32,17 @@ public:
QString type;
Exchange::ExchangeOptions options;
TableField arguments;
Frame::TableField arguments;
bool _q_method(const QAMQP::Frame::Method &frame);
bool _q_method(const Frame::Method &frame);
void _q_disconnected();
bool delayedDeclare;
bool declared;
Q_DECLARE_PUBLIC(QAMQP::Exchange)
Q_DECLARE_PUBLIC(Exchange)
};
}
} // namespace QAMQP
#endif // amqp_exchange_p_h__

View File

@ -6,7 +6,9 @@
#include <QList>
#include <QDebug>
using namespace QAMQP;
using namespace QAMQP::Frame;
Base::Base(Type type)
: size_(0),
type_(type),
@ -42,42 +44,44 @@ qint32 Base::size() const
return 0;
}
void QAMQP::Frame::Base::writeHeader(QDataStream &stream) const
void Base::writeHeader(QDataStream &stream) const
{
stream << type_;
stream << channel_;
stream << qint32(size());
}
void QAMQP::Frame::Base::writeEnd(QDataStream &stream) const
void Base::writeEnd(QDataStream &stream) const
{
stream << qint8(FRAME_END);
}
void QAMQP::Frame::Base::writePayload( QDataStream & ) const{}
void Base::writePayload(QDataStream &stream) const
{
Q_UNUSED(stream)
}
void QAMQP::Frame::Base::readHeader( QDataStream & stream )
void Base::readHeader(QDataStream &stream)
{
stream >> type_;
stream >> channel_;
stream >> size_;
}
void QAMQP::Frame::Base::readEnd(QDataStream &stream)
void Base::readEnd(QDataStream &stream)
{
unsigned char end_ = 0;
stream.readRawData(reinterpret_cast<char*>(&end_), sizeof(end_));
if (end_ != FRAME_END) {
if (end_ != FRAME_END)
qWarning("Wrong end of frame");
}
}
void QAMQP::Frame::Base::readPayload(QDataStream &stream)
void Base::readPayload(QDataStream &stream)
{
stream.skipRawData(size_);
}
void QAMQP::Frame::Base::toStream(QDataStream &stream) const
void Base::toStream(QDataStream &stream) const
{
writeHeader(stream);
writePayload(stream);
@ -86,47 +90,47 @@ void QAMQP::Frame::Base::toStream(QDataStream &stream) const
//////////////////////////////////////////////////////////////////////////
QAMQP::Frame::Method::Method(MethodClass methodClass, qint16 id)
Frame::Method::Method(MethodClass methodClass, qint16 id)
: Base(ftMethod), methodClass_(methodClass), id_(id)
{
}
QAMQP::Frame::Method::Method(QDataStream &raw)
Frame::Method::Method(QDataStream &raw)
: Base(raw)
{
readPayload(raw);
}
QAMQP::Frame::Method::Method(): Base(ftMethod)
Frame::Method::Method(): Base(ftMethod)
{
}
MethodClass QAMQP::Frame::Method::methodClass() const
MethodClass Frame::Method::methodClass() const
{
return MethodClass(methodClass_);
}
qint16 QAMQP::Frame::Method::id() const
qint16 Frame::Method::id() const
{
return id_;
}
qint32 QAMQP::Frame::Method::size() const
qint32 Frame::Method::size() const
{
return sizeof(id_) + sizeof(methodClass_) + arguments_.size();
}
void QAMQP::Frame::Method::setArguments(const QByteArray &data)
void Frame::Method::setArguments(const QByteArray &data)
{
arguments_ = data;
}
QByteArray QAMQP::Frame::Method::arguments() const
QByteArray Frame::Method::arguments() const
{
return arguments_;
}
void QAMQP::Frame::Method::readPayload(QDataStream &stream)
void Frame::Method::readPayload(QDataStream &stream)
{
stream >> methodClass_;
stream >> id_;
@ -135,7 +139,7 @@ void QAMQP::Frame::Method::readPayload(QDataStream &stream)
stream.readRawData(arguments_.data(), arguments_.size());
}
void QAMQP::Frame::Method::writePayload(QDataStream &stream) const
void Frame::Method::writePayload(QDataStream &stream) const
{
stream << quint16(methodClass_);
stream << quint16(id_);
@ -144,7 +148,7 @@ void QAMQP::Frame::Method::writePayload(QDataStream &stream) const
//////////////////////////////////////////////////////////////////////////
QVariant QAMQP::Frame::readField(qint8 valueType, QDataStream &s)
QVariant Frame::readField(qint8 valueType, QDataStream &s)
{
QVariant value;
QByteArray tmp;
@ -224,10 +228,10 @@ QVariant QAMQP::Frame::readField(qint8 valueType, QDataStream &s)
}
case 'D':
{
QAMQP::Frame::decimal v;
Frame::decimal v;
s >> v.scale;
s >> v.value;
value = QVariant::fromValue<QAMQP::Frame::decimal>(v);
value = QVariant::fromValue<Frame::decimal>(v);
}
break;
case 's':
@ -281,7 +285,7 @@ QVariant QAMQP::Frame::readField(qint8 valueType, QDataStream &s)
return value;
}
QDataStream & QAMQP::Frame::deserialize(QDataStream &stream, QAMQP::Frame::TableField &f)
QDataStream & Frame::deserialize(QDataStream &stream, Frame::TableField &f)
{
QByteArray data;
stream >> data;
@ -297,7 +301,7 @@ QDataStream & QAMQP::Frame::deserialize(QDataStream &stream, QAMQP::Frame::Table
return stream;
}
QDataStream & QAMQP::Frame::serialize(QDataStream &stream, const TableField &f)
QDataStream & Frame::serialize(QDataStream &stream, const TableField &f)
{
QByteArray data;
QDataStream s(&data, QIODevice::WriteOnly);
@ -316,7 +320,7 @@ QDataStream & QAMQP::Frame::serialize(QDataStream &stream, const TableField &f)
return stream;
}
void QAMQP::Frame::print(const TableField &f)
void Frame::print(const TableField &f)
{
TableField::ConstIterator i;
for (i = f.begin(); i != f.end(); ++i) {
@ -333,7 +337,7 @@ void QAMQP::Frame::print(const TableField &f)
}
}
void QAMQP::Frame::writeField(qint8 valueType, QDataStream &s, const QVariant &value, bool withType)
void Frame::writeField(qint8 valueType, QDataStream &s, const QVariant &value, bool withType)
{
QByteArray tmp;
if (withType)
@ -375,7 +379,7 @@ void QAMQP::Frame::writeField(qint8 valueType, QDataStream &s, const QVariant &v
break;
case 'D':
{
QAMQP::Frame::decimal v(value.value<QAMQP::Frame::decimal>());
Frame::decimal v(value.value<Frame::decimal>());
s << v.scale;
s << v.value;
}
@ -418,7 +422,7 @@ void QAMQP::Frame::writeField(qint8 valueType, QDataStream &s, const QVariant &v
}
}
void QAMQP::Frame::writeField(QDataStream &s, const QVariant &value)
void Frame::writeField(QDataStream &s, const QVariant &value)
{
char type = 0;
switch (value.type()) {
@ -484,29 +488,29 @@ void QAMQP::Frame::writeField(QDataStream &s, const QVariant &value)
//////////////////////////////////////////////////////////////////////////
QAMQP::Frame::Content::Content()
Content::Content()
: Base(ftHeader)
{
}
QAMQP::Frame::Content::Content(MethodClass methodClass)
Content::Content(MethodClass methodClass)
: Base(ftHeader)
{
methodClass_ = methodClass;
}
QAMQP::Frame::Content::Content(QDataStream &raw)
Content::Content(QDataStream &raw)
: Base(raw)
{
readPayload(raw);
}
QAMQP::Frame::MethodClass QAMQP::Frame::Content::methodClass() const
MethodClass Content::methodClass() const
{
return MethodClass(methodClass_);
}
qint32 QAMQP::Frame::Content::size() const
qint32 Content::size() const
{
QDataStream out(&buffer_, QIODevice::WriteOnly);
buffer_.clear();
@ -564,32 +568,32 @@ qint32 QAMQP::Frame::Content::size() const
return buffer_.size();
}
void QAMQP::Frame::Content::setBody(const QByteArray &data)
void Content::setBody(const QByteArray &data)
{
body_ = data;
}
QByteArray QAMQP::Frame::Content::body() const
QByteArray Content::body() const
{
return body_;
}
void QAMQP::Frame::Content::setProperty(Property prop, const QVariant &value)
void Content::setProperty(Property prop, const QVariant &value)
{
properties_[prop] = value;
}
QVariant QAMQP::Frame::Content::property(Property prop) const
QVariant Content::property(Property prop) const
{
return properties_.value(prop);
}
void QAMQP::Frame::Content::writePayload(QDataStream &out) const
void Content::writePayload(QDataStream &out) const
{
out.writeRawData(buffer_.data(), buffer_.size());
}
void QAMQP::Frame::Content::readPayload(QDataStream &in)
void Content::readPayload(QDataStream &in)
{
in >> methodClass_;
in.skipRawData(2); //weight
@ -639,7 +643,7 @@ void QAMQP::Frame::Content::readPayload(QDataStream &in)
properties_[cpClusterID] = readField('s', in);
}
qlonglong QAMQP::Frame::Content::bodySize() const
qlonglong Content::bodySize() const
{
return body_.isEmpty() ? bodySize_ : body_.size();
}
@ -650,44 +654,51 @@ ContentBody::ContentBody()
{
}
QAMQP::Frame::ContentBody::ContentBody(QDataStream &raw)
ContentBody::ContentBody(QDataStream &raw)
: Base(raw)
{
readPayload(raw);
}
void QAMQP::Frame::ContentBody::setBody(const QByteArray &data)
void ContentBody::setBody(const QByteArray &data)
{
body_ = data;
}
QByteArray QAMQP::Frame::ContentBody::body() const
QByteArray ContentBody::body() const
{
return body_;
}
void QAMQP::Frame::ContentBody::writePayload(QDataStream &out) const
void ContentBody::writePayload(QDataStream &out) const
{
out.writeRawData(body_.data(), body_.size());
}
void QAMQP::Frame::ContentBody::readPayload(QDataStream &in)
void ContentBody::readPayload(QDataStream &in)
{
body_.resize(size_);
in.readRawData(body_.data(), body_.size());
}
qint32 QAMQP::Frame::ContentBody::size() const
qint32 ContentBody::size() const
{
return body_.size();
}
//////////////////////////////////////////////////////////////////////////
QAMQP::Frame::Heartbeat::Heartbeat()
Heartbeat::Heartbeat()
: Base(ftHeartbeat)
{
}
void QAMQP::Frame::Heartbeat::readPayload(QDataStream &) {}
void QAMQP::Frame::Heartbeat::writePayload(QDataStream &) const {}
void Heartbeat::readPayload(QDataStream &stream)
{
Q_UNUSED(stream)
}
void Heartbeat::writePayload(QDataStream &stream) const
{
Q_UNUSED(stream)
}

View File

@ -28,7 +28,6 @@ namespace QAMQP
{
class QueuePrivate;
namespace Frame
{
typedef quint16 channel_t;
@ -86,12 +85,12 @@ namespace Frame
*/
typedef QHash<QString, QVariant> TableField;
QDataStream & serialize( QDataStream & stream, const QAMQP::Frame::TableField & f );
QDataStream & deserialize( QDataStream & stream, QAMQP::Frame::TableField & f );
QDataStream & serialize( QDataStream & stream, const Frame::TableField & f );
QDataStream & deserialize( QDataStream & stream, Frame::TableField & f );
QVariant readField( qint8 valueType, QDataStream &s );
void writeField( QDataStream &s, const QVariant & value );
void writeField( qint8 valueType, QDataStream &s, const QVariant & value, bool withType = false );
void print( const QAMQP::Frame::TableField & f );
void print( const Frame::TableField & f );
/*
* @brief Base class for any frames.
@ -280,8 +279,6 @@ namespace Frame
*/
class Content : public Base
{
friend class QAMQP::QueuePrivate;
public:
/*
* Default content frame property
@ -357,6 +354,9 @@ namespace Frame
mutable QByteArray buffer_;
QHash<int, QVariant> properties_;
qlonglong bodySize_;
private:
friend class QAMQP::QueuePrivate;
};
class ContentBody : public Base
@ -398,22 +398,24 @@ namespace Frame
class MethodHandler
{
public:
virtual void _q_method(const QAMQP::Frame::Method &frame) = 0;
virtual void _q_method(const Frame::Method &frame) = 0;
};
class ContentHandler
{
public:
virtual void _q_content(const QAMQP::Frame::Content & frame) = 0;
virtual void _q_content(const Frame::Content & frame) = 0;
};
class ContentBodyHandler
{
public:
virtual void _q_body(const QAMQP::Frame::ContentBody & frame) = 0;
virtual void _q_body(const Frame::ContentBody & frame) = 0;
};
}
}
} // namespace Frame
} // namespace QAMQP
Q_DECLARE_METATYPE(QAMQP::Frame::decimal)
Q_DECLARE_METATYPE(QAMQP::Frame::TableField)

View File

@ -1,3 +1,6 @@
#ifndef amqp_message_h__
#define amqp_message_h__
#include "amqp_frame.h"
#include <QByteArray>
#include <QHash>
@ -11,18 +14,20 @@ struct Message
Message();
virtual ~Message();
typedef QAMQP::Frame::Content::Property MessageProperty;
typedef Frame::Content::Property MessageProperty;
Q_DECLARE_FLAGS(MessageProperties, MessageProperty)
qlonglong deliveryTag;
QByteArray payload;
QHash<MessageProperty, QVariant> property;
QAMQP::Frame::TableField headers;
Frame::TableField headers;
QString routeKey;
QString exchangeName;
int leftSize;
};
typedef QSharedPointer<QAMQP::Message> MessagePtr;
typedef QSharedPointer<Message> MessagePtr;
}
} // namespace QAMQP
#endif // amqp_message_h__

View File

@ -24,7 +24,7 @@ public:
void disconnect();
void sendFrame();
void sendFrame(const QAMQP::Frame::Base &frame);
void sendFrame(const Frame::Base &frame);
bool isSsl() const;
void setSsl(bool value);
@ -72,5 +72,6 @@ private:
QHash<Channel, QList<Frame::ContentBodyHandler*> > m_bodyHandlersByChannel;
};
}
} // namespace QAMQP
#endif // amqp_network_h__

View File

@ -3,7 +3,6 @@
#include "amqp_exchange.h"
using namespace QAMQP;
using namespace QAMQP::Frame;
#include <QCoreApplication>
#include <QDebug>
@ -13,7 +12,7 @@ using namespace QAMQP::Frame;
Queue::Queue(int channelNumber, Client *parent)
: Channel(new QueuePrivate(this), parent)
{
Q_D(QAMQP::Queue);
Q_D(Queue);
d->init(channelNumber, parent);
}
@ -24,7 +23,7 @@ Queue::~Queue()
void Queue::onOpen()
{
Q_D(QAMQP::Queue);
Q_D(Queue);
if (d->delayedDeclare)
d->declare();
@ -38,37 +37,37 @@ void Queue::onOpen()
void Queue::onClose()
{
Q_D(QAMQP::Queue);
Q_D(Queue);
d->remove(true, true);
}
Queue::QueueOptions Queue::option() const
{
Q_D(const QAMQP::Queue);
Q_D(const Queue);
return d->options;
}
void Queue::setNoAck(bool noAck)
{
Q_D(QAMQP::Queue);
Q_D(Queue);
d->noAck = noAck;
}
bool Queue::noAck() const
{
Q_D(const QAMQP::Queue);
Q_D(const Queue);
return d->noAck;
}
void Queue::declare()
{
Q_D(QAMQP::Queue);
Q_D(Queue);
declare(d->name, QueueOptions(Durable | AutoDelete));
}
void Queue::declare(const QString &name, QueueOptions options)
{
Q_D(QAMQP::Queue);
Q_D(Queue);
setName(name);
d->options = options;
d->declare();
@ -76,63 +75,63 @@ void Queue::declare(const QString &name, QueueOptions options)
void Queue::remove(bool ifUnused, bool ifEmpty, bool noWait)
{
Q_D(QAMQP::Queue);
Q_D(Queue);
d->remove(ifUnused, ifEmpty, noWait);
}
void Queue::purge()
{
Q_D(QAMQP::Queue);
Q_D(Queue);
d->purge();
}
void Queue::bind(const QString &exchangeName, const QString &key)
{
Q_D(QAMQP::Queue);
Q_D(Queue);
d->bind(exchangeName, key);
}
void Queue::bind(Exchange *exchange, const QString &key)
{
Q_D(QAMQP::Queue);
Q_D(Queue);
if (exchange)
d->bind(exchange->name(), key);
}
void Queue::unbind(const QString &exchangeName, const QString &key)
{
Q_D(QAMQP::Queue);
Q_D(Queue);
d->unbind(exchangeName, key);
}
void Queue::unbind(Exchange *exchange, const QString &key)
{
Q_D(QAMQP::Queue);
Q_D(Queue);
if (exchange)
d->unbind(exchange->name(), key);
}
void Queue::_q_content(const Content &frame)
void Queue::_q_content(const Frame::Content &frame)
{
Q_D(QAMQP::Queue);
Q_D(Queue);
d->_q_content(frame);
}
void Queue::_q_body(const ContentBody &frame)
void Queue::_q_body(const Frame::ContentBody &frame)
{
Q_D(QAMQP::Queue);
Q_D(Queue);
d->_q_body(frame);
}
QAMQP::MessagePtr Queue::getMessage()
MessagePtr Queue::getMessage()
{
Q_D(QAMQP::Queue);
Q_D(Queue);
return d->messages_.dequeue();
}
bool Queue::hasMessage() const
{
Q_D(const QAMQP::Queue);
Q_D(const Queue);
if (d->messages_.isEmpty())
return false;
@ -142,31 +141,31 @@ bool Queue::hasMessage() const
void Queue::consume(ConsumeOptions options)
{
Q_D(QAMQP::Queue);
Q_D(Queue);
d->consume(options);
}
void Queue::setConsumerTag(const QString &consumerTag)
{
Q_D(QAMQP::Queue);
Q_D(Queue);
d->consumerTag = consumerTag;
}
QString Queue::consumerTag() const
{
Q_D(const QAMQP::Queue);
Q_D(const Queue);
return d->consumerTag;
}
void Queue::get()
{
Q_D(QAMQP::Queue);
Q_D(Queue);
d->get();
}
void Queue::ack(const MessagePtr &message)
{
Q_D(QAMQP::Queue);
Q_D(Queue);
d->ack(message);
}
@ -186,13 +185,13 @@ QueuePrivate::~QueuePrivate()
{
}
bool QueuePrivate::_q_method(const QAMQP::Frame::Method &frame)
bool QueuePrivate::_q_method(const Frame::Method &frame)
{
Q_Q(QAMQP::Queue);
Q_Q(Queue);
if (ChannelPrivate::_q_method(frame))
return true;
if (frame.methodClass() == QAMQP::Frame::fcQueue) {
if (frame.methodClass() == Frame::fcQueue) {
switch (frame.id()) {
case miDeclareOk:
declareOk(frame);
@ -216,7 +215,7 @@ bool QueuePrivate::_q_method(const QAMQP::Frame::Method &frame)
return true;
}
if (frame.methodClass() == QAMQP::Frame::fcBasic) {
if (frame.methodClass() == Frame::fcBasic) {
switch(frame.id()) {
case bmConsumeOk:
consumeOk(frame);
@ -239,16 +238,16 @@ bool QueuePrivate::_q_method(const QAMQP::Frame::Method &frame)
return false;
}
void QueuePrivate::declareOk(const QAMQP::Frame::Method &frame)
void QueuePrivate::declareOk(const Frame::Method &frame)
{
Q_Q(QAMQP::Queue);
Q_Q(Queue);
qDebug() << "Declared queue: " << name;
declared = true;
QByteArray data = frame.arguments();
QDataStream stream(&data, QIODevice::ReadOnly);
name = readField('s', stream).toString();
name = Frame::readField('s', stream).toString();
qint32 messageCount = 0, consumerCount = 0;
stream >> messageCount >> consumerCount;
qDebug("Message count %d\nConsumer count: %d", messageCount, consumerCount);
@ -256,9 +255,9 @@ void QueuePrivate::declareOk(const QAMQP::Frame::Method &frame)
QMetaObject::invokeMethod(q, "declared");
}
void QueuePrivate::deleteOk(const QAMQP::Frame::Method &frame)
void QueuePrivate::deleteOk(const Frame::Method &frame)
{
Q_Q(QAMQP::Queue);
Q_Q(Queue);
qDebug() << "Deleted or purged queue: " << name;
declared = false;
@ -270,19 +269,19 @@ void QueuePrivate::deleteOk(const QAMQP::Frame::Method &frame)
QMetaObject::invokeMethod(q, "removed");
}
void QueuePrivate::bindOk(const QAMQP::Frame::Method &frame)
void QueuePrivate::bindOk(const Frame::Method &frame)
{
Q_UNUSED(frame)
Q_Q(QAMQP::Queue);
Q_Q(Queue);
qDebug() << "Binded to queue: " << name;
QMetaObject::invokeMethod(q, "binded", Q_ARG(bool, true));
}
void QueuePrivate::unbindOk(const QAMQP::Frame::Method &frame)
void QueuePrivate::unbindOk(const Frame::Method &frame)
{
Q_UNUSED(frame)
Q_Q(QAMQP::Queue);
Q_Q(Queue);
qDebug() << "Unbinded queue: " << name;
QMetaObject::invokeMethod(q, "binded", Q_ARG(bool, false));
@ -295,14 +294,14 @@ void QueuePrivate::declare()
return;
}
QAMQP::Frame::Method frame(QAMQP::Frame::fcQueue, miDeclare);
Frame::Method frame(Frame::fcQueue, miDeclare);
frame.setChannel(number);
QByteArray arguments_;
QDataStream out(&arguments_, QIODevice::WriteOnly);
out << qint16(0); //reserver 1
writeField('s', out, name);
Frame::writeField('s', out, name);
out << qint8(options);
writeField('F', out, TableField());
Frame::writeField('F', out, Frame::TableField());
frame.setArguments(arguments_);
sendFrame(frame);
@ -314,13 +313,13 @@ void QueuePrivate::remove(bool ifUnused, bool ifEmpty, bool noWait)
if (!declared)
return;
QAMQP::Frame::Method frame(QAMQP::Frame::fcQueue, miDelete);
Frame::Method frame(Frame::fcQueue, miDelete);
frame.setChannel(number);
QByteArray arguments_;
QDataStream out(&arguments_, QIODevice::WriteOnly);
out << qint16(0); //reserver 1
writeField('s', out, name);
Frame::writeField('s', out, name);
qint8 flag = 0;
@ -339,12 +338,12 @@ void QueuePrivate::purge()
if (!opened)
return;
QAMQP::Frame::Method frame(QAMQP::Frame::fcQueue, miPurge);
Frame::Method frame(Frame::fcQueue, miPurge);
frame.setChannel(number);
QByteArray arguments_;
QDataStream out(&arguments_, QIODevice::WriteOnly);
out << qint16(0); //reserver 1
writeField('s', out, name);
Frame::writeField('s', out, name);
out << qint8(0); // no-wait
frame.setArguments(arguments_);
sendFrame(frame);
@ -357,16 +356,16 @@ void QueuePrivate::bind(const QString & exchangeName, const QString &key)
return;
}
QAMQP::Frame::Method frame(QAMQP::Frame::fcQueue, miBind);
Frame::Method frame(Frame::fcQueue, miBind);
frame.setChannel(number);
QByteArray arguments_;
QDataStream out(&arguments_, QIODevice::WriteOnly);
out << qint16(0); //reserver 1
writeField('s', out, name);
writeField('s', out, exchangeName);
writeField('s', out, key);
Frame::writeField('s', out, name);
Frame::writeField('s', out, exchangeName);
Frame::writeField('s', out, key);
out << qint8(0); // no-wait
writeField('F', out, TableField());
Frame::writeField('F', out, Frame::TableField());
frame.setArguments(arguments_);
sendFrame(frame);
@ -377,15 +376,15 @@ void QueuePrivate::unbind(const QString &exchangeName, const QString &key)
if (!opened)
return;
QAMQP::Frame::Method frame(QAMQP::Frame::fcQueue, miUnbind);
Frame::Method frame(Frame::fcQueue, miUnbind);
frame.setChannel(number);
QByteArray arguments_;
QDataStream out(&arguments_, QIODevice::WriteOnly);
out << qint16(0); //reserver 1
writeField('s', out, name);
writeField('s', out, exchangeName);
writeField('s', out, key);
writeField('F', out, TableField());
Frame::writeField('s', out, name);
Frame::writeField('s', out, exchangeName);
Frame::writeField('s', out, key);
Frame::writeField('F', out, Frame::TableField());
frame.setArguments(arguments_);
sendFrame(frame);
@ -396,27 +395,27 @@ void QueuePrivate::get()
if (!opened)
return;
QAMQP::Frame::Method frame(QAMQP::Frame::fcBasic, bmGet);
Frame::Method frame(Frame::fcBasic, bmGet);
frame.setChannel(number);
QByteArray arguments_;
QDataStream out(&arguments_, QIODevice::WriteOnly);
out << qint16(0); //reserver 1
writeField('s', out, name);
Frame::writeField('s', out, name);
out << qint8(noAck ? 1 : 0); // noAck
frame.setArguments(arguments_);
sendFrame(frame);
}
void QueuePrivate::getOk(const QAMQP::Frame::Method &frame)
void QueuePrivate::getOk(const Frame::Method &frame)
{
QByteArray data = frame.arguments();
QDataStream in(&data, QIODevice::ReadOnly);
qlonglong deliveryTag = readField('L',in).toLongLong();
bool redelivered = readField('t',in).toBool();
QString exchangeName = readField('s',in).toString();
QString routingKey = readField('s',in).toString();
qlonglong deliveryTag = Frame::readField('L',in).toLongLong();
bool redelivered = Frame::readField('t',in).toBool();
QString exchangeName = Frame::readField('s',in).toString();
QString routingKey = Frame::readField('s',in).toString();
Q_UNUSED(redelivered)
@ -432,7 +431,7 @@ void QueuePrivate::ack(const MessagePtr &Message)
if (!opened)
return;
QAMQP::Frame::Method frame(QAMQP::Frame::fcBasic, bmAck);
Frame::Method frame(Frame::fcBasic, bmAck);
frame.setChannel(number);
QByteArray arguments_;
QDataStream out(&arguments_, QIODevice::WriteOnly);
@ -448,43 +447,43 @@ void QueuePrivate::consume(Queue::ConsumeOptions options)
if (!opened)
return;
QAMQP::Frame::Method frame(QAMQP::Frame::fcBasic, bmConsume);
Frame::Method frame(Frame::fcBasic, bmConsume);
frame.setChannel(number);
QByteArray arguments_;
QDataStream out(&arguments_, QIODevice::WriteOnly);
out << qint16(0); //reserver 1
writeField('s', out, name);
writeField('s', out, consumerTag);
Frame::writeField('s', out, name);
Frame::writeField('s', out, consumerTag);
out << qint8(options); // no-wait
writeField('F', out, TableField());
Frame::writeField('F', out, Frame::TableField());
frame.setArguments(arguments_);
sendFrame(frame);
}
void QueuePrivate::consumeOk(const QAMQP::Frame::Method &frame)
void QueuePrivate::consumeOk(const Frame::Method &frame)
{
qDebug() << "Consume ok: " << name;
declared = false;
QByteArray data = frame.arguments();
QDataStream stream(&data, QIODevice::ReadOnly);
consumerTag = readField('s',stream).toString();
consumerTag = Frame::readField('s',stream).toString();
qDebug("Consumer tag = %s", qPrintable(consumerTag));
}
void QueuePrivate::deliver(const QAMQP::Frame::Method &frame)
void QueuePrivate::deliver(const Frame::Method &frame)
{
QByteArray data = frame.arguments();
QDataStream in(&data, QIODevice::ReadOnly);
QString consumer_ = readField('s',in).toString();
QString consumer_ = Frame::readField('s',in).toString();
if (consumer_ != consumerTag)
return;
qlonglong deliveryTag = readField('L',in).toLongLong();
bool redelivered = readField('t',in).toBool();
QString exchangeName = readField('s',in).toString();
QString routingKey = readField('s',in).toString();
qlonglong deliveryTag = Frame::readField('L',in).toLongLong();
bool redelivered = Frame::readField('t',in).toBool();
QString exchangeName = Frame::readField('s',in).toString();
QString routingKey = Frame::readField('s',in).toString();
Q_UNUSED(redelivered)
@ -495,7 +494,7 @@ void QueuePrivate::deliver(const QAMQP::Frame::Method &frame)
messages_.enqueue(newMessage);
}
void QueuePrivate::_q_content(const QAMQP::Frame::Content &frame)
void QueuePrivate::_q_content(const Frame::Content &frame)
{
Q_ASSERT(frame.channel() == number);
if (frame.channel() != number)
@ -513,9 +512,9 @@ void QueuePrivate::_q_content(const QAMQP::Frame::Content &frame)
message->property[Message::MessageProperty(i.key())]= i.value();
}
void QueuePrivate::_q_body(const QAMQP::Frame::ContentBody &frame)
void QueuePrivate::_q_body(const Frame::ContentBody &frame)
{
Q_Q(QAMQP::Queue);
Q_Q(Queue);
Q_ASSERT(frame.channel() == number);
if (frame.channel() != number)
return;

View File

@ -68,7 +68,7 @@ Q_SIGNALS:
void declared();
void binded(bool);
void removed();
void messageReceived(QAMQP::Queue *pQueue);
void messageReceived(Queue *pQueue);
void empty();
protected:
@ -78,18 +78,15 @@ protected:
private:
Queue(int channelNumber = -1, Client * parent = 0);
void _q_content(const QAMQP::Frame::Content & frame);
void _q_body(const QAMQP::Frame::ContentBody & frame);
void _q_content(const Frame::Content & frame);
void _q_body(const Frame::ContentBody & frame);
Q_DISABLE_COPY(Queue)
Q_DECLARE_PRIVATE(QAMQP::Queue)
Q_DECLARE_PRIVATE(Queue)
friend class ClientPrivate;
};
}
#ifdef QAMQP_P_INCLUDE
# include "amqp_queue_p.h"
} // namespace QAMQP
#endif
#endif // amqp_queue_h__

View File

@ -1,15 +1,14 @@
#ifndef amqp_queue_p_h__
#define amqp_queue_p_h__
#include "amqp_channel_p.h"
#define METHOD_ID_ENUM(name, id) name = id, name ## Ok
#include <QQueue>
#include "amqp_channel_p.h"
#define METHOD_ID_ENUM(name, id) name = id, name ## Ok
namespace QAMQP
{
using namespace QAMQP::Frame;
class QueuePrivate: public ChannelPrivate
{
public:
@ -30,27 +29,27 @@ public:
void bind(const QString &exchangeName, const QString &key);
void unbind(const QString &exchangeName, const QString &key);
void declareOk(const QAMQP::Frame::Method &frame);
void deleteOk(const QAMQP::Frame::Method &frame);
void bindOk(const QAMQP::Frame::Method &frame);
void unbindOk(const QAMQP::Frame::Method &frame);
void declareOk(const Frame::Method &frame);
void deleteOk(const Frame::Method &frame);
void bindOk(const Frame::Method &frame);
void unbindOk(const Frame::Method &frame);
/************************************************************************/
/* CLASS BASIC METHODS */
/************************************************************************/
void consume(Queue::ConsumeOptions options);
void consumeOk(const QAMQP::Frame::Method &frame);
void deliver(const QAMQP::Frame::Method &frame);
void consumeOk(const Frame::Method &frame);
void deliver(const Frame::Method &frame);
void get();
void getOk(const QAMQP::Frame::Method &frame);
void getOk(const Frame::Method &frame);
void ack(const MessagePtr &Message);
QString type;
Queue::QueueOptions options;
bool _q_method(const QAMQP::Frame::Method &frame);
bool _q_method(const Frame::Method &frame);
bool delayedDeclare;
bool declared;
@ -58,16 +57,17 @@ public:
QString consumerTag;
QQueue<QPair<QString, QString> > delayedBindings;
QQueue<QAMQP::MessagePtr> messages_;
QQueue<MessagePtr> messages_;
bool recievingMessage;
void _q_content(const QAMQP::Frame::Content &frame);
void _q_body(const QAMQP::Frame::ContentBody &frame);
void _q_content(const Frame::Content &frame);
void _q_body(const Frame::ContentBody &frame);
Q_DECLARE_PUBLIC(QAMQP::Queue)
Q_DECLARE_PUBLIC(Queue)
};
}
} // namespace QAMQP
#endif // amqp_queue_p_h__