use Q_Q/Q_D from Qt rather than P_Q/P_D, style cleanups for Qt coding style

delete exception-related code, as Qt does not officially support them anymore
This commit is contained in:
Matt Broadstone 2014-05-28 12:25:28 -04:00
parent 387af4038e
commit c069b063d3
25 changed files with 4134 additions and 4301 deletions

View File

@ -7,6 +7,7 @@
#include "amqp_exchange_p.h"
#include "amqp_queue.h"
#include "amqp_queue_p.h"
#include "amqp_connection_p.h"
#include "amqp_authenticator.h"
using namespace QAMQP;
@ -31,56 +32,47 @@ struct ClientExceptionCleaner
//////////////////////////////////////////////////////////////////////////
ClientPrivate::ClientPrivate( Client * q ) :
port(AMQPPORT)
, host(QString::fromLatin1(AMQPHOST))
, virtualHost(QString::fromLatin1(AMQPVHOST))
, pq_ptr(q)
ClientPrivate::ClientPrivate(Client * q)
: port(AMQPPORT),
host(QString::fromLatin1(AMQPHOST)),
virtualHost(QString::fromLatin1(AMQPVHOST)),
q_ptr(q)
{
}
ClientPrivate::~ClientPrivate()
{
}
void ClientPrivate::init(QObject *parent)
{
pq_func()->setParent(parent);
Q_Q(QAMQP::Client);
q->setParent(parent);
if (!network_)
{
network_ = new QAMQP::Network(pq_func());
}
network_ = new QAMQP::Network(q);
if (!connection_)
{
connection_ = new QAMQP::Connection(pq_func());
}
connection_ = new QAMQP::Connection(q);
network_->setMethodHandlerConnection(connection_);
setAuth(new AMQPlainAuthenticator(QString::fromLatin1(AMQPLOGIN), QString::fromLatin1(AMQPPSWD)));
QObject::connect(connection_, SIGNAL(connected()), pq_func(), SIGNAL(connected()));
QObject::connect(connection_, SIGNAL(disconnected()), pq_func(), SIGNAL(disconnected()));
QObject::connect(connection_, SIGNAL(connected()), q, SIGNAL(connected()));
QObject::connect(connection_, SIGNAL(disconnected()), q, SIGNAL(disconnected()));
}
void ClientPrivate::init(QObject * parent, const QUrl & con)
void ClientPrivate::init(QObject *parent, const QUrl &connectionString)
{
init(parent);
parseCnnString(con);
ClientPrivate::connect();
parseConnectionString(connectionString);
connect();
}
void ClientPrivate::setAuth(Authenticator *auth)
{
auth_ = QSharedPointer<Authenticator>(auth);
}
void ClientPrivate::printConnect() const
{
QTextStream stream(stdout);
@ -88,58 +80,57 @@ void ClientPrivate::printConnect() const
stream << "host = " << host << endl;
stream << "vhost = " << virtualHost << endl;
if(auth_ && auth_->type() == "AMQPLAIN")
{
if (auth_ && auth_->type() == QLatin1String("AMQPLAIN")) {
QSharedPointer<AMQPlainAuthenticator> a = auth_.staticCast<AMQPlainAuthenticator>();
stream << "user = " << a->login() << endl;
stream << "passw = " << a->password() << endl;
}
}
void ClientPrivate::connect()
{
ClientPrivate::sockConnect();
ClientPrivate::login();
sockConnect();
login();
}
void ClientPrivate::parseCnnString( const QUrl & con )
void ClientPrivate::parseConnectionString(const QUrl &connectionString)
{
P_Q(QAMQP::Client);
if(con.scheme() == AMQPSCHEME || con.scheme() == AMQPSSCHEME )
{
q->setSsl(con.scheme() == AMQPSSCHEME);
q->setPassword(con.password());
q->setUser(con.userName());
q->setPort(con.port(AMQPPORT));
q->setHost(con.host());
q->setVirtualHost(con.path());
Q_Q(QAMQP::Client);
if (connectionString.scheme() != AMQPSCHEME &&
connectionString.scheme() != AMQPSSCHEME) {
qDebug() << Q_FUNC_INFO << "invalid scheme: " << connectionString.scheme();
return;
}
q->setSsl(connectionString.scheme() == AMQPSSCHEME);
q->setPassword(connectionString.password());
q->setUser(connectionString.userName());
q->setPort(connectionString.port(AMQPPORT));
q->setHost(connectionString.host());
q->setVirtualHost(connectionString.path());
}
void ClientPrivate::sockConnect()
{
if (network_->state() != QAbstractSocket::UnconnectedState)
{
disconnect();
}
network_->connectTo(host, port);
}
void ClientPrivate::login()
{
}
Exchange *ClientPrivate::createExchange(int channelNumber, const QString &name)
{
Exchange * exchange_ = new Exchange(channelNumber, pq_func());
Q_Q(QAMQP::Client);
Exchange * exchange_ = new Exchange(channelNumber, q);
network_->addMethodHandlerForChannel(exchange_->channelNumber(), exchange_);
QObject::connect(connection_, SIGNAL(connected()), exchange_, SLOT(_q_open()));
exchange_->pd_func()->open();
QObject::connect(pq_func(), SIGNAL(disconnected()), exchange_, SLOT(_q_disconnected()));
exchange_->d_func()->open();
QObject::connect(q, SIGNAL(disconnected()), exchange_, SLOT(_q_disconnected()));
exchange_->setName(name);
return exchange_;
@ -147,117 +138,103 @@ Exchange * ClientPrivate::createExchange(int channelNumber, const QString &name
Queue *ClientPrivate::createQueue(int channelNumber, const QString &name )
{
Queue * queue_ = new Queue(channelNumber, pq_func());
Q_Q(QAMQP::Client);
Queue *queue_ = new Queue(channelNumber, q);
network_->addMethodHandlerForChannel(queue_->channelNumber(), queue_);
network_->addContentHandlerForChannel(queue_->channelNumber(), queue_);
network_->addContentBodyHandlerForChannel(queue_->channelNumber(), queue_);
QObject::connect(connection_, SIGNAL(connected()), queue_, SLOT(_q_open()));
queue_->pd_func()->open();
QObject::connect(pq_func(), SIGNAL(disconnected()), queue_, SLOT(_q_disconnected()));
queue_->d_func()->open();
QObject::connect(q, SIGNAL(disconnected()), queue_, SLOT(_q_disconnected()));
queue_->setName(name);
return queue_;
}
void ClientPrivate::disconnect()
{
P_Q(QAMQP::Client);
Q_UNUSED(q);
if(network_->state() != QAbstractSocket::UnconnectedState)
{
network_->QAMQP::Network::disconnect();
connection_->pd_func()->connected = false;
emit pq_func()->disconnected();
Q_Q(QAMQP::Client);
if (network_->state() != QAbstractSocket::UnconnectedState) {
network_->disconnect();
connection_->d_func()->connected = false;
Q_EMIT q->disconnected();
}
}
//////////////////////////////////////////////////////////////////////////
QAMQP::Client::Client( QObject * parent /*= 0*/ )
: pd_ptr(new ClientPrivate(this))
QAMQP::Client::Client(QObject *parent)
: QObject(parent),
d_ptr(new ClientPrivate(this))
{
QT_TRY {
pd_func()->init(parent);
} QT_CATCH(...) {
ClientExceptionCleaner::cleanup(this, pd_func());
QT_RETHROW;
}
d_ptr->init(parent);
}
QAMQP::Client::Client( const QUrl & connectionString, QObject * parent /*= 0*/ )
: pd_ptr(new ClientPrivate(this))
QAMQP::Client::Client(const QUrl & connectionString, QObject * parent)
: d_ptr(new ClientPrivate(this))
{
QT_TRY {
pd_func()->init(parent, connectionString);
} QT_CATCH(...) {
ClientExceptionCleaner::cleanup(this, pd_func());
QT_RETHROW;
}
d_ptr->init(parent, connectionString);
}
QAMQP::Client::~Client()
{
QT_TRY {
QEvent e(QEvent::Destroy);
QCoreApplication::sendEvent(this, &e);
} QT_CATCH(const std::exception&) {
// if this fails we can't do anything about it but at least we are not allowed to throw.
}
}
quint32 QAMQP::Client::port() const
quint16 QAMQP::Client::port() const
{
return pd_func()->port;
Q_D(const QAMQP::Client);
return d->port;
}
void QAMQP::Client::setPort( quint32 port )
void QAMQP::Client::setPort(quint16 port)
{
pd_func()->port = port;
Q_D(QAMQP::Client);
d->port = port;
}
QString QAMQP::Client::host() const
{
return pd_func()->host;
Q_D(const QAMQP::Client);
return d->host;
}
void QAMQP::Client::setHost( const QString & host )
{
pd_func()->host = host;
Q_D(QAMQP::Client);
d->host = host;
}
QString QAMQP::Client::virtualHost() const
{
return pd_func()->virtualHost;
Q_D(const QAMQP::Client);
return d->virtualHost;
}
void QAMQP::Client::setVirtualHost(const QString &virtualHost)
{
pd_func()->virtualHost = virtualHost;
Q_D(QAMQP::Client);
d->virtualHost = virtualHost;
}
QString QAMQP::Client::user() const
{
const Authenticator * auth = pd_func()->auth_.data();
if(auth && auth->type() == "AMQPLAIN")
{
Q_D(const QAMQP::Client);
const Authenticator * auth = d->auth_.data();
if (auth && auth->type() == QLatin1String("AMQPLAIN")) {
const AMQPlainAuthenticator * a = static_cast<const AMQPlainAuthenticator *>(auth);
return a->login();
}
return QString();
}
void QAMQP::Client::setUser(const QString &user)
{
Authenticator * auth = pd_func()->auth_.data();
if(auth && auth->type() == "AMQPLAIN")
{
Q_D(const QAMQP::Client);
Authenticator * auth = d->auth_.data();
if (auth && auth->type() == QLatin1String("AMQPLAIN")) {
AMQPlainAuthenticator * a = static_cast<AMQPlainAuthenticator *>(auth);
a->setLogin(user);
}
@ -265,22 +242,21 @@ void QAMQP::Client::setUser( const QString & user )
QString QAMQP::Client::password() const
{
const Authenticator * auth = pd_func()->auth_.data();
if(auth && auth->type() == "AMQPLAIN")
{
Q_D(const QAMQP::Client);
const Authenticator * auth = d->auth_.data();
if (auth && auth->type() == "AMQPLAIN") {
const AMQPlainAuthenticator * a = static_cast<const AMQPlainAuthenticator *>(auth);
return a->password();
}
return QString();
}
void QAMQP::Client::setPassword(const QString &password)
{
Authenticator * auth = pd_func()->auth_.data();
if(auth && auth->type() == "AMQPLAIN")
{
Q_D(QAMQP::Client);
Authenticator *auth = d->auth_.data();
if (auth && auth->type() == QLatin1String("AMQPLAIN")) {
AMQPlainAuthenticator * a = static_cast<AMQPlainAuthenticator *>(auth);
a->setPassword(password);
}
@ -289,98 +265,116 @@ void QAMQP::Client::setPassword( const QString & password )
void QAMQP::Client::printConnect() const
{
#ifdef _DEBUG
pd_func()->printConnect();
Q_D(const QAMQP::Client);
d->printConnect();
#endif // _DEBUG
}
void QAMQP::Client::closeChannel()
{
}
Exchange *QAMQP::Client::createExchange(int channelNumber)
{
return pd_func()->createExchange(channelNumber, QString());
Q_D(QAMQP::Client);
return d->createExchange(channelNumber, QString());
}
Exchange *QAMQP::Client::createExchange( const QString &name, int channelNumber )
{
return pd_func()->createExchange(channelNumber, name);
Q_D(QAMQP::Client);
return d->createExchange(channelNumber, name);
}
Queue *QAMQP::Client::createQueue(int channelNumber)
{
return pd_func()->createQueue(channelNumber, QString());
Q_D(QAMQP::Client);
return d->createQueue(channelNumber, QString());
}
Queue *QAMQP::Client::createQueue( const QString &name, int channelNumber )
{
return pd_func()->createQueue(channelNumber, name);
Q_D(QAMQP::Client);
return d->createQueue(channelNumber, name);
}
void QAMQP::Client::open()
{
return pd_func()->connect();
Q_D(QAMQP::Client);
return d->connect();
}
void QAMQP::Client::open(const QUrl &connectionString)
{
pd_func()->parseCnnString(connectionString);
Q_D(QAMQP::Client);
d->parseConnectionString(connectionString);
open();
}
void QAMQP::Client::close()
{
return pd_func()->disconnect();
Q_D(QAMQP::Client);
return d->disconnect();
}
void QAMQP::Client::reopen()
{
pd_func()->disconnect();
pd_func()->connect();
Q_D(QAMQP::Client);
d->disconnect();
d->connect();
}
void QAMQP::Client::setAuth(Authenticator *auth)
{
pd_func()->setAuth(auth);
Q_D(QAMQP::Client);
d->setAuth(auth);
}
Authenticator *QAMQP::Client::auth() const
{
return pd_func()->auth_.data();
Q_D(const QAMQP::Client);
return d->auth_.data();
}
bool QAMQP::Client::isSsl() const
{
return pd_func()->network_->isSsl();
Q_D(const QAMQP::Client);
return d->network_->isSsl();
}
void QAMQP::Client::setSsl(bool value)
{
pd_func()->network_->setSsl(value);
Q_D(QAMQP::Client);
d->network_->setSsl(value);
}
bool QAMQP::Client::autoReconnect() const
{
return pd_func()->network_->autoReconnect();
Q_D(const QAMQP::Client);
return d->network_->autoReconnect();
}
void QAMQP::Client::setAutoReconnect(bool value)
{
pd_func()->network_->setAutoReconnect(value);
Q_D(QAMQP::Client);
d->network_->setAutoReconnect(value);
}
bool QAMQP::Client::isConnected() const
{
return pd_func()->connection_->isConnected();
Q_D(const QAMQP::Client);
return d->connection_->isConnected();
}
void QAMQP::Client::addCustomProperty(const QString &name, const QString &value)
{
return pd_func()->connection_->addCustomProperty(name, value);
Q_D(QAMQP::Client);
return d->connection_->addCustomProperty(name, value);
}
QString QAMQP::Client::customProperty(const QString &name) const
{
return pd_func()->connection_->customProperty(name);
Q_D(const QAMQP::Client);
return d->connection_->customProperty(name);
}

View File

@ -1,12 +1,14 @@
#ifndef qamqp_amqp_h__
#define qamqp_amqp_h__
#ifndef QAMQP_H
#define QAMQP_H
#include <QObject>
#include <QUrl>
#include "amqp_global.h"
namespace QAMQP
{
class Exchange;
class Queue;
class ClientPrivate;
@ -15,22 +17,14 @@ namespace QAMQP
class Client : public QObject
{
Q_OBJECT
Q_PROPERTY(quint32 port READ port WRITE setPort);
Q_PROPERTY(QString host READ host WRITE setHost);
Q_PROPERTY(QString virtualHost READ virtualHost WRITE setVirtualHost);
Q_PROPERTY(QString user READ user WRITE setUser);
Q_PROPERTY(QString password READ password WRITE setPassword);
Q_PROPERTY(bool ssl READ isSsl WRITE setSsl);
Q_PROPERTY(bool autoReconnect READ autoReconnect WRITE setAutoReconnect);
Q_PROPERTY(bool connected READ isConnected );
Q_DISABLE_COPY(Client)
P_DECLARE_PRIVATE(QAMQP::Client)
friend class ConnectionPrivate;
friend class ChannelPrivate;
Q_PROPERTY(quint32 port READ port WRITE setPort)
Q_PROPERTY(QString host READ host WRITE setHost)
Q_PROPERTY(QString virtualHost READ virtualHost WRITE setVirtualHost)
Q_PROPERTY(QString user READ user WRITE setUser)
Q_PROPERTY(QString password READ password WRITE setPassword)
Q_PROPERTY(bool ssl READ isSsl WRITE setSsl)
Q_PROPERTY(bool autoReconnect READ autoReconnect WRITE setAutoReconnect)
Q_PROPERTY(bool connected READ isConnected )
public:
Client(QObject *parent = 0);
@ -49,8 +43,8 @@ namespace QAMQP
Queue *createQueue(int channelNumber = -1);
Queue *createQueue(const QString &name, int channelNumber = -1);
quint32 port() const;
void setPort(quint32 port);
quint16 port() const;
void setPort(quint16 port);
QString host() const;
void setHost(const QString &host);
@ -66,6 +60,7 @@ namespace QAMQP
void setAuth(Authenticator *auth);
Authenticator *auth() const;
void open();
void open(const QUrl &connectionString);
void close();
@ -83,15 +78,16 @@ namespace QAMQP
void connected();
void disconnected();
protected:
ClientPrivate * const pd_ptr;
private:
Q_DISABLE_COPY(Client)
Q_DECLARE_PRIVATE(QAMQP::Client)
QScopedPointer<ClientPrivate> d_ptr;
friend class ConnectionPrivate;
friend class ChannelPrivate;
friend struct ClientExceptionCleaner;
//void chanalConnect();
};
}
#endif // qamqp_amqp_h__
} // namespace QAMQP
#endif // QAMQP

View File

@ -1,43 +1,44 @@
#include "amqp_authenticator.h"
#include "amqp_frame.h"
QString QAMQP::AMQPlainAuthenticator::login() const
using namespace QAMQP;
AMQPlainAuthenticator::AMQPlainAuthenticator(const QString &l, const QString &p)
: login_(l),
password_(p)
{
}
AMQPlainAuthenticator::~AMQPlainAuthenticator()
{
}
QString AMQPlainAuthenticator::login() const
{
return login_;
}
QString QAMQP::AMQPlainAuthenticator::password() const
QString AMQPlainAuthenticator::password() const
{
return password_;
}
QAMQP::AMQPlainAuthenticator::AMQPlainAuthenticator( const QString & l /*= QString()*/, const QString & p /*= QString()*/ )
{
login_ = l;
password_ = p;
}
QAMQP::AMQPlainAuthenticator::~AMQPlainAuthenticator()
{
}
QString QAMQP::AMQPlainAuthenticator::type() const
QString AMQPlainAuthenticator::type() const
{
return "AMQPLAIN";
}
void QAMQP::AMQPlainAuthenticator::setLogin( const QString& l )
void AMQPlainAuthenticator::setLogin(const QString &l)
{
login_ = l;
}
void QAMQP::AMQPlainAuthenticator::setPassword( const QString &p )
void AMQPlainAuthenticator::setPassword(const QString &p)
{
password_ = p;
}
void QAMQP::AMQPlainAuthenticator::write( QDataStream & out )
void AMQPlainAuthenticator::write(QDataStream &out)
{
QAMQP::Frame::writeField('s', out, type());
QAMQP::Frame::TableField response;

View File

@ -1,32 +1,41 @@
#ifndef amqp_authenticator_h__
#define amqp_authenticator_h__
#include "amqp_global.h"
#include <QString>
#include <QDataStream>
#include "amqp_global.h"
namespace QAMQP
{
class Authenticator
{
public:
virtual ~Authenticator(){};
virtual ~Authenticator() {}
virtual QString type() const = 0;
virtual void write(QDataStream &out) = 0;
};
class AMQPlainAuthenticator : public Authenticator
{
QString login_, password_;
public:
AMQPlainAuthenticator(const QString &login = QString(), const QString &password = QString());
virtual ~AMQPlainAuthenticator();
QString login() const;
void setLogin(const QString &l);
QString password() const;
void setPassword(const QString &p);
virtual QString type() const;
virtual void write(QDataStream &out);
private:
QString login_;
QString password_;
};
}

View File

@ -1,5 +1,6 @@
#include "amqp_channel.h"
#include "amqp_channel_p.h"
#include "amqp_connection_p.h"
#include "amqp.h"
#include "amqp_p.h"
@ -10,59 +11,29 @@
using namespace QAMQP;
namespace QAMQP
{
int ChannelPrivate::nextChannelNumber_ = 0;
struct ChannelExceptionCleaner
{
/* this cleans up when the constructor throws an exception */
static inline void cleanup(Channel *that, ChannelPrivate *d)
{
#ifdef QT_NO_EXCEPTIONS
Q_UNUSED(that);
Q_UNUSED(d);
#else
Q_UNUSED(that);
Q_UNUSED(d);
#endif
}
};
}
//////////////////////////////////////////////////////////////////////////
QAMQP::Channel::Channel(int channelNumber /*= -1*/, Client * parent /*= 0*/ )
: pd_ptr(new ChannelPrivate(this))
QAMQP::Channel::Channel(int channelNumber, Client *parent)
: QObject(parent),
d_ptr(new ChannelPrivate(this))
{
QT_TRY {
pd_func()->init(channelNumber, parent);
} QT_CATCH(...) {
ChannelExceptionCleaner::cleanup(this, pd_func());
QT_RETHROW;
}
Q_D(QAMQP::Channel);
d->init(channelNumber, parent);
}
QAMQP::Channel::Channel( ChannelPrivate * d )
: pd_ptr(d)
QAMQP::Channel::Channel(ChannelPrivate *dd, Client *parent)
: QObject(parent),
d_ptr(dd)
{
}
QAMQP::Channel::~Channel()
{
QT_TRY {
QEvent e(QEvent::Destroy);
QCoreApplication::sendEvent(this, &e);
} QT_CATCH(const std::exception&) {
// if this fails we can't do anything about it but at least we are not allowed to throw.
}
}
void QAMQP::Channel::closeChannel()
{
P_D(Channel);
Q_D(QAMQP::Channel);
d->needOpen = true;
if (d->opened)
d->close(0, QString(), 0,0);
@ -71,29 +42,32 @@ void QAMQP::Channel::closeChannel()
void QAMQP::Channel::reopen()
{
Q_D(QAMQP::Channel);
closeChannel();
pd_func()->open();
d->open();
}
QString QAMQP::Channel::name()
QString QAMQP::Channel::name() const
{
return pd_func()->name;
Q_D(const QAMQP::Channel);
return d->name;
}
int QAMQP::Channel::channelNumber()
int QAMQP::Channel::channelNumber() const
{
return pd_func()->number;
Q_D(const QAMQP::Channel);
return d->number;
}
void QAMQP::Channel::setName(const QString &name)
{
pd_func()->name = name;
Q_D(QAMQP::Channel);
d->name = name;
}
void QAMQP::Channel::stateChanged(int state)
{
switch(ChannelPrivate::State(state))
{
switch(ChannelPrivate::State(state)) {
case ChannelPrivate::csOpened:
emit opened();
break;
@ -111,50 +85,52 @@ void QAMQP::Channel::stateChanged( int state )
void QAMQP::Channel::_q_method(const Frame::Method &frame)
{
pd_func()->_q_method(frame);
Q_D(QAMQP::Channel);
d->_q_method(frame);
}
bool QAMQP::Channel::isOpened() const
{
return pd_func()->opened;
Q_D(const QAMQP::Channel);
return d->opened;
}
void QAMQP::Channel::onOpen()
{
}
void QAMQP::Channel::onClose()
{
}
void QAMQP::Channel::setQOS(qint32 prefetchSize, quint16 prefetchCount)
{
pd_func()->setQOS(prefetchSize, prefetchCount);
Q_D(QAMQP::Channel);
d->setQOS(prefetchSize, prefetchCount);
}
//////////////////////////////////////////////////////////////////////////
int ChannelPrivate::nextChannelNumber_ = 0;
ChannelPrivate::ChannelPrivate(Channel * q)
: number(0)
, opened(false)
, needOpen(true)
, pq_ptr(q)
: number(0),
opened(false),
needOpen(true),
q_ptr(q)
{
}
ChannelPrivate::~ChannelPrivate()
{
}
void ChannelPrivate::init(int channelNumber, Client *parent)
{
Q_Q(QAMQP::Channel);
needOpen = channelNumber == -1 ? true : false;
number = channelNumber == -1 ? ++nextChannelNumber_ : channelNumber;
nextChannelNumber_ = qMax(channelNumber, (nextChannelNumber_ + 1));
pq_func()->setParent(parent);
q->setParent(parent);
client_ = parent;
}
@ -170,8 +146,7 @@ bool ChannelPrivate::_q_method( const QAMQP::Frame::Method & frame )
qDebug("Channel#%d:", number);
switch(frame.id())
{
switch (frame.id()) {
case miOpenOk:
openOk(frame);
break;
@ -199,20 +174,19 @@ void ChannelPrivate::_q_open()
void ChannelPrivate::sendFrame(const QAMQP::Frame::Base &frame)
{
if(client_)
{
client_->pd_func()->network_->sendFrame(frame);
if (client_) {
client_->d_func()->network_->sendFrame(frame);
}
}
void ChannelPrivate::open()
{
if (!needOpen || opened)
return;
if(!client_->pd_func()->connection_->isConnected())
if (!client_->d_func()->connection_->isConnected())
return;
qDebug("Open channel #%d", number);
QAMQP::Frame::Method frame(QAMQP::Frame::fcChannel, miOpen);
frame.setChannel(number);
@ -223,10 +197,8 @@ void ChannelPrivate::open()
sendFrame(frame);
}
void ChannelPrivate::flow()
{
}
void ChannelPrivate::flow(const QAMQP::Frame::Method &frame)
@ -236,7 +208,6 @@ void ChannelPrivate::flow( const QAMQP::Frame::Method & frame )
void ChannelPrivate::flowOk()
{
}
void ChannelPrivate::flowOk(const QAMQP::Frame::Method &frame)
@ -258,12 +229,13 @@ void ChannelPrivate::close(int code, const QString & text, int classId, int meth
stream << qint16(methodId);
frame.setArguments(arguments_);
client_->pd_func()->network_->sendFrame(frame);
client_->d_func()->network_->sendFrame(frame);
}
void ChannelPrivate::close(const QAMQP::Frame::Method &frame)
{
pq_func()->stateChanged(csClosed);
Q_Q(QAMQP::Channel);
q->stateChanged(csClosed);
qDebug(">> CLOSE");
QByteArray data = frame.arguments();
@ -278,7 +250,6 @@ void ChannelPrivate::close( const QAMQP::Frame::Method & frame )
qDebug(">> text: %s", qPrintable(text));
qDebug(">> class-id: %d", classId);
qDebug(">> method-id: %d", methodId);
}
void ChannelPrivate::closeOk()
@ -289,8 +260,9 @@ void ChannelPrivate::closeOk()
void ChannelPrivate::closeOk(const QAMQP::Frame::Method &frame)
{
Q_UNUSED(frame);
P_Q(Channel);
Q_UNUSED(frame)
Q_Q(QAMQP::Channel);
q->stateChanged(csClosed);
q->onClose();
opened = false;
@ -298,23 +270,24 @@ void ChannelPrivate::closeOk( const QAMQP::Frame::Method & frame )
void ChannelPrivate::openOk(const QAMQP::Frame::Method &frame)
{
Q_UNUSED(frame);
P_Q(Channel);
Q_UNUSED(frame)
Q_Q(QAMQP::Channel);
qDebug(">> OpenOK");
opened = true;
q->stateChanged(csOpened);
q->onOpen();
}
void ChannelPrivate::setQOS(qint32 prefetchSize, quint16 prefetchCount)
{
client_->pd_func()->connection_->pd_func()->setQOS(prefetchSize, prefetchCount, number, false);
client_->d_func()->connection_->d_func()->setQOS(prefetchSize, prefetchCount, number, false);
}
void ChannelPrivate::_q_disconnected()
{
nextChannelNumber_ = 0;
opened = false;
}
#include "moc_amqp_channel.cpp"

View File

@ -7,25 +7,23 @@
namespace QAMQP
{
class ChannelPrivate;
class Client;
class ChannelPrivate;
class Channel : public QObject, public Frame::MethodHandler
{
Q_OBJECT
Q_PROPERTY(int number READ channelNumber)
Q_PROPERTY(QString name READ name WRITE setName)
Q_PROPERTY(int number READ channelNumber);
Q_PROPERTY(QString name READ name WRITE setName);
P_DECLARE_PRIVATE(QAMQP::Channel)
Q_DISABLE_COPY(Channel)
public:
~Channel();
void closeChannel();
void reopen();
QString name();
int channelNumber();
QString name() const;
int channelNumber() const;
void setName(const QString &name);
void setQOS(qint32 prefetchSize, quint16 prefetchCount);
@ -37,24 +35,24 @@ namespace QAMQP
void flowChanged(bool enabled);
protected:
Q_DISABLE_COPY(Channel)
Q_DECLARE_PRIVATE(QAMQP::Channel)
Channel(int channelNumber = -1, Client *parent = 0);
Channel(ChannelPrivate * d);
Channel(ChannelPrivate *dd, Client *parent = 0);
QScopedPointer<ChannelPrivate> d_ptr;
Q_PRIVATE_SLOT(d_func(), void _q_open())
Q_PRIVATE_SLOT(d_func(), void _q_disconnected())
virtual void onOpen();
virtual void onClose();
ChannelPrivate * const pd_ptr;
private:
void stateChanged(int state);
friend class ClientPrivate;
void _q_method(const QAMQP::Frame::Method &frame);
Q_PRIVATE_SLOT(pd_func(), void _q_open())
Q_PRIVATE_SLOT(pd_func(), void _q_disconnected())
friend class ClientPrivate;
};
}
#ifdef QAMQP_P_INCLUDE
# include "amqp_channel_p.h"
#endif
#endif // amqp_channel_h__

View File

@ -12,10 +12,8 @@ namespace QAMQP
class ClientPrivate;
class ChannelPrivate
{
P_DECLARE_PUBLIC(QAMQP::Channel)
public:
enum MethodId
{
enum MethodId {
METHOD_ID_ENUM(miOpen, 10),
METHOD_ID_ENUM(miFlow, 20),
METHOD_ID_ENUM(miClose, 40)
@ -79,7 +77,9 @@ namespace QAMQP
bool opened;
bool needOpen;
Channel * const pq_ptr;
Q_DECLARE_PUBLIC(QAMQP::Channel)
Channel * const q_ptr;
};
}
#endif // amqp_channel_p_h__

View File

@ -12,50 +12,24 @@
using namespace QAMQP;
namespace QAMQP
{
struct ConnectionExceptionCleaner
{
/* this cleans up when the constructor throws an exception */
static inline void cleanup(Connection *that, ConnectionPrivate *d)
{
#ifdef QT_NO_EXCEPTIONS
Q_UNUSED(that);
Q_UNUSED(d);
#else
Q_UNUSED(that);
Q_UNUSED(d);
#endif
}
};
}
//////////////////////////////////////////////////////////////////////////
ConnectionPrivate::ConnectionPrivate(Connection * q)
: closed_(false)
, connected(false)
, pq_ptr(q)
: closed_(false),
connected(false),
q_ptr(q)
{
}
ConnectionPrivate::~ConnectionPrivate()
{
}
void ConnectionPrivate::init(Client * parent)
{
pq_func()->setParent(parent);
Q_Q(QAMQP::Connection);
q->setParent(parent);
client_ = parent;
heartbeatTimer_ = new QTimer(parent);
QObject::connect(heartbeatTimer_, SIGNAL(timeout()),
pq_func(), SLOT(_q_heartbeat()));
QObject::connect(heartbeatTimer_, SIGNAL(timeout()), q, SLOT(_q_heartbeat()));
}
void ConnectionPrivate::startOk()
@ -71,19 +45,14 @@ void ConnectionPrivate::startOk()
clientProperties.unite(customProperty);
QAMQP::Frame::serialize(stream, clientProperties);
client_->pd_func()->auth_->write(stream);
client_->d_func()->auth_->write(stream);
QAMQP::Frame::writeField('s', stream, "en_US");
frame.setArguments(arguments_);
client_->pd_func()->network_->sendFrame(frame);
client_->d_func()->network_->sendFrame(frame);
}
void ConnectionPrivate::secureOk()
{
}
void ConnectionPrivate::tuneOk()
@ -97,7 +66,7 @@ void ConnectionPrivate::tuneOk()
stream << qint16(heartbeatTimer_->interval() / 1000); //heartbeat
frame.setArguments(arguments_);
client_->pd_func()->network_->sendFrame(frame);
client_->d_func()->network_->sendFrame(frame);
}
void ConnectionPrivate::open()
@ -112,7 +81,7 @@ void ConnectionPrivate::open()
stream << qint8(0);
frame.setArguments(arguments_);
client_->pd_func()->network_->sendFrame(frame);
client_->d_func()->network_->sendFrame(frame);
}
void ConnectionPrivate::start(const QAMQP::Frame::Method &frame)
@ -144,7 +113,7 @@ void ConnectionPrivate::start( const QAMQP::Frame::Method & frame )
void ConnectionPrivate::secure(const QAMQP::Frame::Method &frame)
{
Q_UNUSED(frame);
Q_UNUSED(frame)
}
void ConnectionPrivate::tune(const QAMQP::Frame::Method &frame)
@ -165,30 +134,32 @@ void ConnectionPrivate::tune( const QAMQP::Frame::Method & frame )
qDebug(">> frame_max: %d", frame_max);
qDebug(">> heartbeat: %d", heartbeat);
if(heartbeatTimer_)
{
if (heartbeatTimer_) {
heartbeatTimer_->setInterval(heartbeat * 1000);
if (heartbeatTimer_->interval())
{
heartbeatTimer_->start();
} else {
else
heartbeatTimer_->stop();
}
}
tuneOk();
open();
}
void ConnectionPrivate::openOk(const QAMQP::Frame::Method &frame)
{
Q_UNUSED(frame);
Q_UNUSED(frame)
Q_Q(QAMQP::Connection);
qDebug(">> OpenOK");
connected = true;
pq_func()->openOk();
q->openOk();
}
void ConnectionPrivate::close(const QAMQP::Frame::Method &frame)
{
Q_Q(QAMQP::Connection);
qDebug(">> CLOSE");
QByteArray data = frame.arguments();
QDataStream stream(&data, QIODevice::ReadOnly);
@ -203,8 +174,8 @@ void ConnectionPrivate::close( const QAMQP::Frame::Method & frame )
qDebug(">> class-id: %d", classId);
qDebug(">> method-id: %d", methodId);
connected = false;
client_->pd_func()->network_->error(QAbstractSocket::RemoteHostClosedError);
QMetaObject::invokeMethod(pq_func(), "disconnected");
client_->d_func()->network_->error(QAbstractSocket::RemoteHostClosedError);
QMetaObject::invokeMethod(q, "disconnected");
}
void ConnectionPrivate::close(int code, const QString &text, int classId, int methodId)
@ -221,26 +192,27 @@ void ConnectionPrivate::close(int code, const QString & text, int classId, int m
stream << qint16(methodId);
frame.setArguments(arguments_);
client_->pd_func()->network_->sendFrame(frame);
client_->d_func()->network_->sendFrame(frame);
}
void ConnectionPrivate::closeOk()
{
QAMQP::Frame::Method frame(QAMQP::Frame::fcConnection, miCloseOk);
connected = false;
client_->pd_func()->network_->sendFrame(frame);
client_->d_func()->network_->sendFrame(frame);
}
void ConnectionPrivate::closeOk( const QAMQP::Frame::Method & )
void ConnectionPrivate::closeOk(const QAMQP::Frame::Method &frame)
{
Q_UNUSED(frame)
Q_Q(QAMQP::Connection);
connected = false;
QMetaObject::invokeMethod(pq_func(), "disconnected");
QMetaObject::invokeMethod(q, "disconnected");
if (heartbeatTimer_)
{
heartbeatTimer_->stop();
}
}
void ConnectionPrivate::setQOS(qint32 prefetchSize, quint16 prefetchCount, int channel, bool global)
@ -255,7 +227,7 @@ void ConnectionPrivate::setQOS( qint32 prefetchSize, quint16 prefetchCount, int
out << qint8(global ? 1 : 0);
frame.setArguments(arguments_);
client_->pd_func()->network_->sendFrame(frame);
client_->d_func()->network_->sendFrame(frame);
}
bool ConnectionPrivate::_q_method(const QAMQP::Frame::Method &frame)
@ -266,15 +238,14 @@ bool ConnectionPrivate::_q_method( const QAMQP::Frame::Method & frame )
qDebug() << "Connection:";
if (closed_)
{
if (closed_) {
if (frame.id() == miCloseOk)
closeOk(frame);
return true;
}
switch(MethodId(frame.id()))
{
switch (MethodId(frame.id())) {
case miStart:
start(frame);
break;
@ -297,101 +268,102 @@ bool ConnectionPrivate::_q_method( const QAMQP::Frame::Method & frame )
qWarning("Unknown method-id %d", frame.id());
return false;
}
return true;
}
void ConnectionPrivate::_q_heartbeat()
{
QAMQP::Frame::Heartbeat frame;
client_->pd_func()->network_->sendFrame(frame);
client_->d_func()->network_->sendFrame(frame);
}
//////////////////////////////////////////////////////////////////////////
Connection::Connection( Client * parent /*= 0*/ )
: pd_ptr(new ConnectionPrivate(this))
Connection::Connection(Client *parent)
: QObject(parent),
d_ptr(new ConnectionPrivate(this))
{
QT_TRY {
pd_func()->init(parent);
} QT_CATCH(...) {
ConnectionExceptionCleaner::cleanup(this, pd_func());
QT_RETHROW;
}
Q_D(QAMQP::Connection);
d->init(parent);
}
Connection::~Connection()
{
QT_TRY {
QEvent e(QEvent::Destroy);
QCoreApplication::sendEvent(this, &e);
} QT_CATCH(const std::exception&) {
// if this fails we can't do anything about it but at least we are not allowed to throw.
}
}
void Connection::startOk()
{
pd_func()->startOk();
Q_D(QAMQP::Connection);
d->startOk();
}
void Connection::secureOk()
{
pd_func()->secureOk();
Q_D(QAMQP::Connection);
d->secureOk();
}
void Connection::tuneOk()
{
pd_func()->tuneOk();
Q_D(QAMQP::Connection);
d->tuneOk();
}
void Connection::open()
{
pd_func()->open();
Q_D(QAMQP::Connection);
d->open();
}
void Connection::close(int code, const QString &text, int classId , int methodId)
{
pd_func()->close(code, text, classId, methodId);
Q_D(QAMQP::Connection);
d->close(code, text, classId, methodId);
}
void Connection::closeOk()
{
pd_func()->closeOk();
emit disconnect();
Q_D(QAMQP::Connection);
d->closeOk();
Q_EMIT disconnect();
}
void Connection::openOk()
{
emit connected();
Q_EMIT connected();
}
void Connection::_q_method(const QAMQP::Frame::Method &frame)
{
pd_func()->_q_method(frame);
Q_D(QAMQP::Connection);
d->_q_method(frame);
}
bool Connection::isConnected() const
{
return pd_func()->connected;
Q_D(const QAMQP::Connection);
return d->connected;
}
void Connection::setQOS(qint32 prefetchSize, quint16 prefetchCount)
{
pd_func()->setQOS(prefetchSize, prefetchCount, 0, true);
Q_D(QAMQP::Connection);
d->setQOS(prefetchSize, prefetchCount, 0, true);
}
void Connection::addCustomProperty(const QString &name, const QString &value)
{
pd_func()->customProperty[name] = value;
Q_D(QAMQP::Connection);
d->customProperty[name] = value;
}
QString Connection::customProperty(const QString &name) const
{
if(pd_func()->customProperty.contains(name))
{
return pd_func()->customProperty.value(name).toString();
}
Q_D(const QAMQP::Connection);
if (d->customProperty.contains(name))
return d->customProperty.value(name).toString();
return QString();
}
#include "moc_amqp_connection.cpp"

View File

@ -7,19 +7,16 @@
namespace QAMQP
{
class ConnectionPrivate;
class ChannelPrivate;
class ClientPrivate;
class Client;
class ClientPrivate;
class ChannelPrivate;
class ConnectionPrivate;
class Connection : public QObject, public Frame::MethodHandler
{
Q_OBJECT
P_DECLARE_PRIVATE(QAMQP::Connection)
Q_DISABLE_COPY(Connection)
Connection(Client * parent = 0);
public:
~Connection();
virtual ~Connection();
void addCustomProperty(const QString &name, const QString &value);
QString customProperty(const QString &name) const;
@ -38,22 +35,22 @@ namespace QAMQP
Q_SIGNALS:
void disconnected();
void connected();
protected:
ConnectionPrivate * const pd_ptr;
private:
Q_DISABLE_COPY(Connection)
Q_DECLARE_PRIVATE(Connection)
QScopedPointer<ConnectionPrivate> d_ptr;
Connection(Client * parent = 0);
void openOk();
friend class ClientPrivate;
friend class ChannelPrivate;
void _q_method(const QAMQP::Frame::Method &frame);
Q_PRIVATE_SLOT(pd_func(), void _q_heartbeat());
Q_PRIVATE_SLOT(d_func(), void _q_heartbeat())
};
}
// Include private header so MOC won't complain
#ifdef QAMQP_P_INCLUDE
# include "amqp_connection_p.h"
#endif
#endif // amqp_connection_h__

View File

@ -11,12 +11,11 @@ namespace QAMQP
{
class Client;
class ClientPrivate;
class Connection;
class ConnectionPrivate
{
P_DECLARE_PUBLIC(QAMQP::Connection)
public:
enum MethodId
{
enum MethodId {
METHOD_ID_ENUM(miStart, 10),
METHOD_ID_ENUM(miSecure, 20),
METHOD_ID_ENUM(miTune, 30),
@ -26,6 +25,7 @@ namespace QAMQP
ConnectionPrivate(Connection *q);
~ConnectionPrivate();
void init(Client *parent);
void startOk();
void secureOk();
@ -40,6 +40,7 @@ namespace QAMQP
void openOk(const QAMQP::Frame::Method &frame);
void close(const QAMQP::Frame::Method &frame);
void closeOk(const QAMQP::Frame::Method &frame);
bool _q_method(const QAMQP::Frame::Method &frame);
void _q_heartbeat();
@ -50,10 +51,12 @@ namespace QAMQP
bool connected;
QPointer<QTimer> heartbeatTimer_;
Connection * const pq_ptr;
QAMQP::Frame::TableField customProperty;
Q_DECLARE_PUBLIC(QAMQP::Connection)
Connection * const q_ptr;
};
}
#endif // amqp_connection_p_h__

View File

@ -9,36 +9,11 @@ using namespace QAMQP::Frame;
#include <QDebug>
#include <QDataStream>
namespace QAMQP
Exchange::Exchange(int channelNumber, Client *parent)
: Channel(new ExchangePrivate(this), parent)
{
struct ExchangeExceptionCleaner
{
/* this cleans up when the constructor throws an exception */
static inline void cleanup(Exchange *that, ExchangePrivate *d)
{
#ifdef QT_NO_EXCEPTIONS
Q_UNUSED(that);
Q_UNUSED(d);
#else
Q_UNUSED(that);
Q_UNUSED(d);
#endif
}
};
}
Exchange::Exchange(int channelNumber, Client * parent /*= 0*/ )
: Channel(new ExchangePrivate(this))
{
QT_TRY {
pd_func()->init(channelNumber, parent);
} QT_CATCH(...) {
ExchangeExceptionCleaner::cleanup(this, pd_func());
QT_RETHROW;
}
Q_D(QAMQP::Exchange);
d->init(channelNumber, parent);
}
Exchange::~Exchange()
@ -48,95 +23,97 @@ Exchange::~Exchange()
void Exchange::onOpen()
{
P_D(Exchange);
Q_D(QAMQP::Exchange);
if (d->delayedDeclare)
{
d->declare();
}
}
void Exchange::onClose()
{
pd_func()->remove(true, true);
Q_D(QAMQP::Exchange);
d->remove(true, true);
}
Exchange::ExchangeOptions Exchange::option() const
{
return pd_func()->options;
Q_D(const QAMQP::Exchange);
return d->options;
}
QString Exchange::type() const
{
return pd_func()->type;
Q_D(const QAMQP::Exchange);
return d->type;
}
void Exchange::declare(const QString &type, ExchangeOptions option , const TableField &arg)
{
P_D(Exchange);
Q_D(QAMQP::Exchange);
d->options = option;
d->type = type;
d->arguments = arg;
d->declare();
}
void Exchange::remove( bool ifUnused /*= true*/, bool noWait /*= true*/ )
void Exchange::remove(bool ifUnused, bool noWait)
{
pd_func()->remove(ifUnused, noWait);
Q_D(QAMQP::Exchange);
d->remove(ifUnused, noWait);
}
void Exchange::bind(QAMQP::Queue *queue)
{
queue->bind(this, pd_func()->name);
Q_D(QAMQP::Exchange);
queue->bind(this, d->name);
}
void Exchange::bind(const QString &queueName)
{
Q_UNUSED(queueName);
qWarning("Not implement");
qWarning("Not implemented");
}
void Exchange::bind(const QString &queueName, const QString &key)
{
Q_UNUSED(queueName);
Q_UNUSED(key);
qWarning("Not implement");
qWarning("Not implemented");
}
void Exchange::publish(const QString &message, const QString &key, const MessageProperties &prop)
{
pd_func()->publish(message.toUtf8(), key, QString::fromLatin1("text.plain"), QVariantHash(), prop);
Q_D(QAMQP::Exchange);
d->publish(message.toUtf8(), key, QLatin1String("text.plain"), QVariantHash(), prop);
}
void Exchange::publish( const QByteArray & message, const QString & key, const QString &mimeType, const MessageProperties &prop )
void Exchange::publish(const QByteArray &message, const QString &key,
const QString &mimeType, const MessageProperties &prop)
{
pd_func()->publish(message, key, mimeType, QVariantHash(), prop);
Q_D(QAMQP::Exchange);
d->publish(message, key, mimeType, QVariantHash(), prop);
}
void Exchange::publish( const QByteArray & message, const QString & key, const QVariantHash &headers, const QString &mimeType, const MessageProperties &prop )
void Exchange::publish(const QByteArray &message, const QString &key,
const QVariantHash &headers, const QString &mimeType,
const MessageProperties &prop)
{
pd_func()->publish(message, key, mimeType, headers, prop);
Q_D(QAMQP::Exchange);
d->publish(message, key, mimeType, headers, prop);
}
//////////////////////////////////////////////////////////////////////////
ExchangePrivate::ExchangePrivate(Exchange * q)
:ChannelPrivate(q)
, delayedDeclare(false)
, declared(false)
: ChannelPrivate(q),
delayedDeclare(false),
declared(false)
{
}
ExchangePrivate::~ExchangePrivate()
{
}
bool ExchangePrivate::_q_method(const QAMQP::Frame::Method &frame)
{
if (ChannelPrivate::_q_method(frame))
@ -145,8 +122,7 @@ bool ExchangePrivate::_q_method( const QAMQP::Frame::Method & frame )
if (frame.methodClass() != QAMQP::Frame::fcExchange)
return false;
switch(frame.id())
{
switch(frame.id()) {
case miDeclareOk:
declareOk(frame);
break;
@ -156,27 +132,31 @@ bool ExchangePrivate::_q_method( const QAMQP::Frame::Method & frame )
default:
break;
}
return true;
}
void ExchangePrivate::declareOk( const QAMQP::Frame::Method & )
void ExchangePrivate::declareOk(const QAMQP::Frame::Method &frame)
{
Q_UNUSED(frame)
Q_Q(QAMQP::Exchange);
qDebug() << "Declared exchange: " << name;
declared = true;
QMetaObject::invokeMethod(pq_func(), "declared");
QMetaObject::invokeMethod(q, "declared");
}
void ExchangePrivate::deleteOk( const QAMQP::Frame::Method & )
void ExchangePrivate::deleteOk(const QAMQP::Frame::Method &frame)
{
Q_UNUSED(frame)
Q_Q(QAMQP::Exchange);
qDebug() << "Deleted exchange: " << name;
declared = false;
QMetaObject::invokeMethod(pq_func(), "removed");
QMetaObject::invokeMethod(q, "removed");
}
void ExchangePrivate::declare()
{
if(!opened)
{
if (!opened) {
delayedDeclare = true;
return;
}
@ -200,7 +180,7 @@ void ExchangePrivate::declare( )
delayedDeclare = false;
}
void ExchangePrivate::remove( bool ifUnused /*= true*/, bool noWait /*= true*/ )
void ExchangePrivate::remove(bool ifUnused, bool noWait)
{
QAMQP::Frame::Method frame(QAMQP::Frame::fcExchange, miDelete);
frame.setChannel(number);
@ -221,7 +201,9 @@ void ExchangePrivate::remove( bool ifUnused /*= true*/, bool noWait /*= true*/ )
sendFrame(frame);
}
void ExchangePrivate::publish( const QByteArray & message, const QString & key, const QString &mimeType /*= QString::fromLatin1("text/plain")*/, const QVariantHash & headers, const Exchange::MessageProperties & prop )
void ExchangePrivate::publish(const QByteArray &message, const QString &key,
const QString &mimeType, const QVariantHash & headers,
const Exchange::MessageProperties &prop)
{
QAMQP::Frame::Method frame(QAMQP::Frame::fcBasic, bmPublish);
frame.setChannel(number);
@ -236,7 +218,6 @@ void ExchangePrivate::publish( const QByteArray & message, const QString & key,
frame.setArguments(arguments_);
sendFrame(frame);
QAMQP::Frame::Content content(QAMQP::Frame::fcBasic);
content.setChannel(number);
content.setProperty(Content::cpContentType, mimeType);
@ -247,26 +228,21 @@ void ExchangePrivate::publish( const QByteArray & message, const QString & key,
Exchange::MessageProperties::ConstIterator i;
for (i = prop.begin(); i != prop.end(); ++i)
{
content.setProperty(i.key(), i.value());
}
content.setBody(message);
sendFrame(content);
int fullSize = message.size();
for (int sended_ = 0; sended_ < fullSize; sended_+= (FRAME_MAX - 7))
{
for (int sended_ = 0; sended_ < fullSize; sended_+= (FRAME_MAX - 7)) {
QAMQP::Frame::ContentBody body;
QByteArray partition_ = message.mid(sended_, (FRAME_MAX - 7));
body.setChannel(number);
body.setBody(partition_);
sendFrame(body);
}
}
void ExchangePrivate::_q_disconnected()
{
ChannelPrivate::_q_disconnected();

View File

@ -2,31 +2,24 @@
#define amqp_exchange_h__
#include "amqp_channel.h"
namespace QAMQP
{
using namespace QAMQP::Frame;
class Client;
class Queue;
class ClientPrivate;
class ExchangePrivate;
using namespace QAMQP::Frame;
class Exchange : public Channel
{
Q_OBJECT;
Exchange(int channelNumber = -1, Client * parent = 0);
Q_PROPERTY(QString type READ type);
Q_PROPERTY(ExchangeOptions option READ option );
P_DECLARE_PRIVATE(QAMQP::Exchange)
Q_DISABLE_COPY(Exchange);
friend class ClientPrivate;
protected:
void onOpen();
void onClose();
Q_OBJECT
Q_PROPERTY(QString type READ type)
Q_PROPERTY(ExchangeOptions option READ option)
Q_ENUMS(ExchangeOption)
public:
enum ExchangeOption {
NoOptions = 0x0,
Passive = 0x01,
@ -39,26 +32,45 @@ namespace QAMQP
typedef QHash<QAMQP::Frame::Content::Property, QVariant> MessageProperties;
~Exchange();
virtual ~Exchange();
QString type() const;
ExchangeOptions option() const;
void declare(const QString &type = QString::fromLatin1("direct"), ExchangeOptions option = NoOptions, const TableField & arg = TableField());
void declare(const QString &type = QLatin1String("direct"),
ExchangeOptions option = NoOptions,
const TableField &arg = TableField());
void remove(bool ifUnused = true, bool noWait = true);
void bind(QAMQP::Queue *queue);
void bind(const QString &queueName);
void bind(const QString &queueName, const QString &key);
void publish(const QString & message, const QString & key, const MessageProperties &property = MessageProperties() );
void publish(const QByteArray & message, const QString & key, const QString &mimeType, const MessageProperties &property = MessageProperties());
void publish(const QByteArray & message, const QString & key, const QVariantHash &headers, const QString &mimeType, const MessageProperties &property = MessageProperties());
void publish(const QString &message, const QString &key,
const MessageProperties &property = MessageProperties());
void publish(const QByteArray &message, const QString &key,
const QString &mimeType, const MessageProperties &property = MessageProperties());
void publish(const QByteArray &message, const QString &key, const QVariantHash &headers,
const QString &mimeType, const MessageProperties &property = MessageProperties());
Q_SIGNALS:
void declared();
void removed();
protected:
void onOpen();
void onClose();
private:
Q_DISABLE_COPY(Exchange)
Q_DECLARE_PRIVATE(QAMQP::Exchange)
Exchange(int channelNumber = -1, Client * parent = 0);
friend class ClientPrivate;
};
}
Q_DECLARE_OPERATORS_FOR_FLAGS(QAMQP::Exchange::ExchangeOptions)
#endif // amqp_exchange_h__

View File

@ -1,16 +1,15 @@
#include "amqp_channel_p.h"
#define METHOD_ID_ENUM(name, id) name = id, name ## Ok
namespace QAMQP
{
using namespace QAMQP::Frame;
class ExchangePrivate: public ChannelPrivate
{
P_DECLARE_PUBLIC(QAMQP::Exchange)
public:
enum MethodId
{
enum MethodId {
METHOD_ID_ENUM(miDeclare, 10),
METHOD_ID_ENUM(miDelete, 20)
};
@ -24,7 +23,10 @@ namespace QAMQP
void declareOk(const QAMQP::Frame::Method &frame);
void deleteOk(const QAMQP::Frame::Method &frame);
void publish(const QByteArray & message, const QString & key, const QString &mimeType = QString::fromLatin1("text/plain"), const QVariantHash & headers = QVariantHash(), const Exchange::MessageProperties & properties = Exchange::MessageProperties());
void publish(const QByteArray &message, const QString &key,
const QString &mimeType = QLatin1String("text/plain"),
const QVariantHash &headers = QVariantHash(),
const Exchange::MessageProperties &properties = Exchange::MessageProperties());
QString type;
Exchange::ExchangeOptions options;
@ -36,5 +38,7 @@ namespace QAMQP
bool delayedDeclare;
bool declared;
Q_DECLARE_PUBLIC(QAMQP::Exchange)
};
}

View File

@ -1,12 +1,18 @@
#include "amqp_frame.h"
#include <float.h>
#include <QDateTime>
#include <QList>
#include <QDebug>
#include <float.h>
using namespace QAMQP::Frame;
Base::Base( Type type ) : size_(0), type_(type), channel_(0) {}
Base::Base(Type type)
: size_(0),
type_(type),
channel_(0)
{
}
Base::Base(QDataStream &raw)
{
@ -41,7 +47,6 @@ void QAMQP::Frame::Base::writeHeader( QDataStream & stream ) const
stream << type_;
stream << channel_;
stream << qint32(size());
}
void QAMQP::Frame::Base::writeEnd(QDataStream &stream) const
@ -56,15 +61,13 @@ void QAMQP::Frame::Base::readHeader( QDataStream & stream )
stream >> type_;
stream >> channel_;
stream >> size_;
}
void QAMQP::Frame::Base::readEnd(QDataStream &stream)
{
unsigned char end_ = 0;
stream.readRawData(reinterpret_cast<char*>(&end_), sizeof(end_));
if(end_ != FRAME_END )
{
if (end_ != FRAME_END) {
qWarning("Wrong end of frame");
}
}
@ -83,11 +86,9 @@ void QAMQP::Frame::Base::toStream( QDataStream & stream ) const
//////////////////////////////////////////////////////////////////////////
QAMQP::Frame::Method::Method(MethodClass methodClass, qint16 id)
: Base(ftMethod), methodClass_(methodClass), id_(id)
{
}
QAMQP::Frame::Method::Method(QDataStream &raw)
@ -98,7 +99,6 @@ QAMQP::Frame::Method::Method( QDataStream& raw )
QAMQP::Frame::Method::Method(): Base(ftMethod)
{
}
MethodClass QAMQP::Frame::Method::methodClass() const
@ -142,10 +142,8 @@ void QAMQP::Frame::Method::writePayload( QDataStream & stream ) const
stream.writeRawData(arguments_.data(), arguments_.size());
}
//////////////////////////////////////////////////////////////////////////
QVariant QAMQP::Frame::readField(qint8 valueType, QDataStream &s)
{
QVariant value;
@ -153,8 +151,7 @@ QVariant QAMQP::Frame::readField( qint8 valueType, QDataStream &s )
qint8 nameSize_ = 0;
char octet = 0;
switch(valueType)
{
switch(valueType) {
case 't':
s.readRawData(&octet, sizeof(octet));
value = QVariant::fromValue<bool>(octet > 0);
@ -237,11 +234,7 @@ QVariant QAMQP::Frame::readField( qint8 valueType, QDataStream &s )
s >> nameSize_;
tmp.resize(nameSize_);
s.readRawData(tmp.data(), tmp.size());
#if QT_VERSION < 0x050000
value = QString::fromAscii(tmp.data(), nameSize_);
#else // For Qt5
value = QString::fromLatin1(tmp.data(), nameSize_);
#endif
break;
case 'S':
{
@ -251,11 +244,7 @@ QVariant QAMQP::Frame::readField( qint8 valueType, QDataStream &s )
tmp.resize(length_);
}
s.readRawData(tmp.data(), tmp.size());
#if QT_VERSION < 0x050000
value = QString::fromAscii(tmp.data(), tmp.size());
#else // For Qt5
value = QString::fromLatin1(tmp.data(), tmp.size());
#endif
break;
case 'A':
{
@ -263,8 +252,7 @@ QVariant QAMQP::Frame::readField( qint8 valueType, QDataStream &s )
qint8 type = 0;
s >> length_;
QList<QVariant> array_;
for (int i =0; i < length_; ++i)
{
for (int i =0; i < length_; ++i) {
s >> type;
array_ << readField(type, s);
}
@ -299,10 +287,8 @@ QDataStream & QAMQP::Frame::deserialize( QDataStream & stream, QAMQP::Frame::Tab
stream >> data;
QDataStream s(&data, QIODevice::ReadOnly);
while(!s.atEnd())
{
while(!s.atEnd()) {
qint8 valueType = 0;
QString name = readField('s', s).toString();
s >> valueType;
f[name] = readField(valueType, s);
@ -316,13 +302,12 @@ QDataStream & QAMQP::Frame::serialize( QDataStream & stream, const TableField &
QByteArray data;
QDataStream s(&data, QIODevice::WriteOnly);
TableField::ConstIterator i;
for(i = f.begin(); i != f.end(); ++i)
{
for (i = f.begin(); i != f.end(); ++i) {
writeField('s', s, i.key());
writeField(s, i.value());
}
if(data.isEmpty())
{
if (data.isEmpty()) {
stream << qint32(0);
} else {
stream << data;
@ -334,10 +319,8 @@ QDataStream & QAMQP::Frame::serialize( QDataStream & stream, const TableField &
void QAMQP::Frame::print(const TableField &f)
{
TableField::ConstIterator i;
for(i = f.begin(); i != f.end(); ++i)
{
switch(i.value().type())
{
for (i = f.begin(); i != f.end(); ++i) {
switch(i.value().type()) {
case QVariant::Hash:
qDebug() << "\t" << qPrintable(i.key()) << ": FIELD_TABLE";
break;
@ -356,8 +339,7 @@ void QAMQP::Frame::writeField( qint8 valueType, QDataStream &s, const QVariant &
if (withType)
s << valueType;
switch(valueType)
{
switch (valueType) {
case 't':
s << (value.toBool() ? qint8(1) : qint8(0));
break;
@ -402,22 +384,14 @@ void QAMQP::Frame::writeField( qint8 valueType, QDataStream &s, const QVariant &
{
QString str = value.toString();
s << quint8(str.length());
#if QT_VERSION < 0x050000
s.writeRawData(str.toAscii().data(), str.length());
#else // For Qt5
s.writeRawData(str.toLatin1().data(), str.length());
#endif
}
break;
case 'S':
{
QString str = value.toString();
s << quint32(str.length());
#if QT_VERSION < 0x050000
s.writeRawData(str.toAscii().data(), str.length());
#else // For Qt5
s.writeRawData(str.toLatin1().data(), str.length());
#endif
}
break;
case 'A':
@ -425,10 +399,8 @@ void QAMQP::Frame::writeField( qint8 valueType, QDataStream &s, const QVariant &
QList<QVariant> array_(value.toList());
s << quint32(array_.count());
for (int i =0; i < array_.count(); ++i)
{
writeField(s, array_.at(i));
}
}
break;
case 'T':
s << qulonglong(value.toDateTime().toMSecsSinceEpoch());
@ -449,8 +421,7 @@ void QAMQP::Frame::writeField( qint8 valueType, QDataStream &s, const QVariant &
void QAMQP::Frame::writeField(QDataStream &s, const QVariant &value)
{
char type = 0;
switch(value.type())
{
switch (value.type()) {
case QVariant::Bool:
type = 't';
break;
@ -502,7 +473,9 @@ void QAMQP::Frame::writeField( QDataStream &s, const QVariant & value )
case QVariant::List:
type = 'A';
break;
default:;
default:
qDebug() << Q_FUNC_INFO << "unhandled variant type: " << value.type();
}
if (type)
@ -511,17 +484,19 @@ void QAMQP::Frame::writeField( QDataStream &s, const QVariant & value )
//////////////////////////////////////////////////////////////////////////
QAMQP::Frame::Content::Content():Base(ftHeader)
QAMQP::Frame::Content::Content()
: Base(ftHeader)
{
}
QAMQP::Frame::Content::Content( MethodClass methodClass ):Base(ftHeader)
QAMQP::Frame::Content::Content(MethodClass methodClass)
: Base(ftHeader)
{
methodClass_ = methodClass;
}
QAMQP::Frame::Content::Content( QDataStream& raw ): Base(raw)
QAMQP::Frame::Content::Content(QDataStream &raw)
: Base(raw)
{
readPayload(raw);
}
@ -541,10 +516,7 @@ qint32 QAMQP::Frame::Content::size() const
qint16 prop_ = 0;
foreach (int p, properties_.keys())
{
prop_ |= p;
}
out << prop_;
if (prop_ & cpContentType)
@ -673,10 +645,13 @@ qlonglong QAMQP::Frame::Content::bodySize() const
}
//////////////////////////////////////////////////////////////////////////
ContentBody::ContentBody() : Base(ftBody)
{}
ContentBody::ContentBody()
: Base(ftBody)
{
}
QAMQP::Frame::ContentBody::ContentBody( QDataStream& raw ): Base(raw)
QAMQP::Frame::ContentBody::ContentBody(QDataStream &raw)
: Base(raw)
{
readPayload(raw);
}
@ -709,8 +684,10 @@ qint32 QAMQP::Frame::ContentBody::size() const
//////////////////////////////////////////////////////////////////////////
QAMQP::Frame::Heartbeat::Heartbeat() : Base(ftHeartbeat) {}
QAMQP::Frame::Heartbeat::Heartbeat()
: Base(ftHeartbeat)
{
}
void QAMQP::Frame::Heartbeat::readPayload(QDataStream &) {}
void QAMQP::Frame::Heartbeat::writePayload(QDataStream &) const {}

View File

@ -20,36 +20,36 @@
#define AMQP_BASIC_APP_ID_FLAG (1 << 3)
#define AMQP_BASIC_CLUSTER_ID_FLAG (1 << 2)
/*!
Library namespace
@namespace QAMQP
/**
* Library namespace
* @namespace QAMQP
*/
namespace QAMQP
{
class QueuePrivate;
/*!
Frame namespace
@namespace Frame
*/
namespace Frame
{
typedef quint16 channel_t;
/*!
@brief Header size in bytes
/*
* @brief Header size in bytes
*/
static const qint64 HEADER_SIZE = 7;
/*!
@brief Frame end indicator size in bytes
/*
* @brief Frame end indicator size in bytes
*/
static const qint64 FRAME_END_SIZE = 1;
/*!
@brief Frame end marker
/*
* @brief Frame end marker
*/
static const quint8 FRAME_END = 0xCE;
/*!
@brief Frame type
/*
* @brief Frame type
*/
enum Type
{
@ -59,17 +59,17 @@ namespace QAMQP
ftHeartbeat = 8 /*!< Used define heartbeat frame */
};
/*!
@brief Frame method class
@enum MethodClass
/*
* @brief Frame method class
* @enum MethodClass
*/
enum MethodClass
{
fcConnection = 10, /*!< Define class of methods related to connection */
fcChannel = 20, /*!< Define class of methods related to channel */
fcExchange = 40, /*!< Define class of methods related to exchange */
fcQueue = 50, /*!< Define class of methods related to queue */
fcBasic = 60, /*!< Define class of methods related to basic command */
fcConnection = 10, // Define class of methods related to connection
fcChannel = 20, // Define class of methods related to channel
fcExchange = 40, // Define class of methods related to exchange
fcQueue = 50, // Define class of methods related to queue
fcBasic = 60, // Define class of methods related to basic command
fcTx = 90,
};
@ -77,13 +77,12 @@ namespace QAMQP
{
qint8 scale;
quint32 value;
};
/*!
@brief Definition implementation of TableField type
@detailed Define implementation TableField type in builtin Qt types. Key contains field name, value contains field data.
It can by any type witch support serialization in AMQP types.
/*
* @brief Definition implementation of TableField type
* @detailed Define implementation TableField type in builtin Qt types. Key contains field name, value contains field data.
* It can by any type witch support serialization in AMQP types.
*/
typedef QHash<QString, QVariant> TableField;
@ -94,68 +93,68 @@ namespace QAMQP
void writeField( qint8 valueType, QDataStream &s, const QVariant & value, bool withType = false );
void print( const QAMQP::Frame::TableField & f );
/*!
@brief Base class for any frames.
@detailed Implement main methods for serialize and deserialize raw frame data.
All frames start with a 7-octet header composed of a type field (octet), a channel field (short integer) and a
size field (long integer):
@code Frame struct
0 1 3 7 size+7 size+8
+------+---------+---------+ +-------------+ +-----------+
| type | channel | size | | payload | | frame-end |
+------+---------+---------+ +-------------+ +-----------+
@endcode
octet short long 'size' octets octet
/*
* @brief Base class for any frames.
* @detailed Implement main methods for serialize and deserialize raw frame data.
* All frames start with a 7-octet header composed of a type field (octet), a channel field (short integer) and a
* size field (long integer):
* @code Frame struct
* 0 1 3 7 size+7 size+8
* +------+---------+---------+ +-------------+ +-----------+
* | type | channel | size | | payload | | frame-end |
* +------+---------+---------+ +-------------+ +-----------+
* @endcode
* octet short long 'size' octets octet
*/
class Base
{
public:
/*!
Base class constructor.
@detailed Construct frame class for sending.
@param type Define type of constructed frame.
/*
* Base class constructor.
* @detailed Construct frame class for sending.
* @param type Define type of constructed frame.
*/
Base(Type type);
/*!
Base class constructor.
@detailed Construct frame class from received raw data.
@param raw Data stream for reading source data.
/*
* Base class constructor.
* @detailed Construct frame class from received raw data.
* @param raw Data stream for reading source data.
*/
Base(QDataStream& raw);
/*!
Base class virtual destructor
/*
* Base class virtual destructor
*/
virtual ~Base();
/*!
Frame type
@detailed Return type of current frame.
/*
* Frame type
* @detailed Return type of current frame.
*/
Type type() const;
/*!
Set number of associated channel.
@param channel Number of channel.
@sa channel()
/*
* Set number of associated channel.
* @param channel Number of channel.
* @sa channel()
*/
void setChannel(qint16 channel);
/*!
Return number of associated channel.
@sa setChannel()
/*
* Return number of associated channel.
* @sa setChannel()
*/
qint16 channel() const;
/*!
Return size of frame.
/*
* Return size of frame.
*/
virtual qint32 size() const;
/*!
Output frame to stream.
@param stream Stream for serilize frame.
/*
* Output frame to stream.
* @param stream Stream for serilize frame.
*/
void toStream(QDataStream &stream) const;
@ -169,71 +168,71 @@ namespace QAMQP
void readEnd(QDataStream &stream);
qint32 size_;
private:
qint8 type_;
qint16 channel_;
};
/*!
@brief Class for working with method frames.
@detailed Implement main methods for serialize and deserialize raw method frame data.
Method frame bodies consist of an invariant list of data fields, called "arguments". All method bodies start
with identifier numbers for the class and method:
@code Frame struct
0 2 4
+----------+-----------+-------------- - -
| class-id | method-id | arguments...
+----------+-----------+-------------- - -
short short ...
@endcode
/*
* @brief Class for working with method frames.
* @detailed Implement main methods for serialize and deserialize raw method frame data.
* Method frame bodies consist of an invariant list of data fields, called "arguments". All method bodies start
* with identifier numbers for the class and method:
* @code Frame struct
* 0 2 4
* +----------+-----------+-------------- - -
* | class-id | method-id | arguments...
* +----------+-----------+-------------- - -
* short short ...
* @endcode
*/
class Method : public Base
{
public:
/*!
Method class constructor.
@detailed Construct frame class for sending.
/*
* Method class constructor.
* @detailed Construct frame class for sending.
*/
Method();
/*!
Method class constructor.
@detailed Construct frame class for sending.
@param methodClass Define method class id of constructed frame.
@param id Define method id of constructed frame.
/*
* Method class constructor.
* @detailed Construct frame class for sending.
* @param methodClass Define method class id of constructed frame.
* @param id Define method id of constructed frame.
*/
Method(MethodClass methodClass, qint16 id);
/*!
Method class constructor.
@detailed Construct frame class from received raw data.
@param raw Data stream for reading source data.
/*
* Method class constructor.
* @detailed Construct frame class from received raw data.
* @param raw Data stream for reading source data.
*/
Method(QDataStream &raw);
/*!
Method class type.
/*
* Method class type.
*/
MethodClass methodClass() const;
/*!
Method id.
/*
* Method id.
*/
qint16 id() const;
qint32 size() const;
/*!
Set arguments for method.
@param data Serialized method arguments.
@sa arguments
/*
* Set arguments for method.
* @param data Serialized method arguments.
* @sa arguments
*/
void setArguments(const QByteArray &data);
/*!
Return arguments for method.
@sa setArguments
/*
* Return arguments for method.
* @sa setArguments
*/
QByteArray arguments() const;
@ -243,49 +242,49 @@ namespace QAMQP
short methodClass_;
qint16 id_;
QByteArray arguments_;
};
/*!
@brief Class for working with content frames.
@detailed Implement main methods for serialize and deserialize raw content frame data.
A content header payload has this format:
@code Frame struct
+----------+--------+-----------+----------------+------------- - -
| class-id | weight | body size | property flags | property list...
+----------+--------+-----------+----------------+------------- - -
short short long long short remainder...
@endcode
| Property | Description |
| ---------- | ----------- |
|cpContentType | MIME content type |
| ocpContentEncoding | MIME content encoding |
| cpHeaders | message header field table |
| cpDeliveryMode| nonpersistent (1) or persistent (2) |
| cpPriority | message priority, 0 to 9 |
| cpCorrelationId | application correlation identifier |
| cpReplyTo | address to reply to |
| cpExpiration | message expiration specification |
| cpMessageId | application message identifier |
| cpTimestamp | message timestamp |
| cpType | message type name |
| cpUserId | creating user id |
| cpAppId | creating application id |
| cpClusterID| cluster ID |
Default property:
@sa setProperty
@sa property
/*
* @brief Class for working with content frames.
* @detailed Implement main methods for serialize and deserialize raw content frame data.
* A content header payload has this format:
* @code Frame struct
* +----------+--------+-----------+----------------+------------- - -
* | class-id | weight | body size | property flags | property list...
* +----------+--------+-----------+----------------+------------- - -
* short short long long short remainder...
* @endcode
*
* | Property | Description |
* | ---------- | ----------- |
* |cpContentType | MIME content type |
* | ocpContentEncoding | MIME content encoding |
* | cpHeaders | message header field table |
* | cpDeliveryMode| nonpersistent (1) or persistent (2) |
* | cpPriority | message priority, 0 to 9 |
* | cpCorrelationId | application correlation identifier |
* | cpReplyTo | address to reply to |
* | cpExpiration | message expiration specification |
* | cpMessageId | application message identifier |
* | cpTimestamp | message timestamp |
* | cpType | message type name |
* | cpUserId | creating user id |
* | cpAppId | creating application id |
* | cpClusterID| cluster ID |
*
* Default property:
* @sa setProperty
* @sa property
*/
class Content : public Base
{
friend class QAMQP::QueuePrivate;
public:
/*! Default content frame property
/*
* Default content frame property
*/
enum Property
{
@ -304,48 +303,47 @@ namespace QAMQP
cpAppId = AMQP_BASIC_APP_ID_FLAG,
cpClusterID = AMQP_BASIC_CLUSTER_ID_FLAG
};
Q_DECLARE_FLAGS(Properties, Property);
Q_DECLARE_FLAGS(Properties, Property)
/*!
Content class constructor.
@detailed Construct frame content header class for sending.
/*
* Content class constructor.
* @detailed Construct frame content header class for sending.
*/
Content();
/*!
Content class constructor.
@detailed Construct frame content header class for sending.
@param methodClass Define method class id of constructed frame.
/*
* Content class constructor.
* @detailed Construct frame content header class for sending.
* @param methodClass Define method class id of constructed frame.
*/
Content(MethodClass methodClass);
/*!
Content class constructor.
@detailed Construct frame content header class for sending.
@param raw Data stream for reading source data.
/*
* Content class constructor.
* @detailed Construct frame content header class for sending.
* @param raw Data stream for reading source data.
*/
Content(QDataStream &raw);
/*!
Method class type.
/*
* Method class type.
*/
MethodClass methodClass() const;
qint32 size() const;
/*!
Set default content header property
@param prop Any default content header property
@param value Associated data
/*
* Set default content header property
* @param prop Any default content header property
* @param value Associated data
*/
void setProperty(Property prop, const QVariant &value);
/*!
Return associated with property value
@param prop Any default content header property
/*
* Return associated with property value
* @param prop Any default content header property
*/
QVariant property(Property prop) const;
void setBody(const QByteArray &data);
QByteArray body() const;
qlonglong bodySize() const;
@ -366,8 +364,10 @@ namespace QAMQP
public:
ContentBody();
ContentBody(QDataStream &raw);
void setBody(const QByteArray &data);
QByteArray body() const;
qint32 size() const;
protected:
void writePayload(QDataStream &stream) const;
@ -377,16 +377,16 @@ namespace QAMQP
QByteArray body_;
};
/*!
@brief Class for working with heartbeat frames.
@detailed Implement frame for heartbeat send.
/*
* @brief Class for working with heartbeat frames.
* @detailed Implement frame for heartbeat send.
*/
class Heartbeat : public Base
{
public:
/*!
Heartbeat class constructor.
@detailed Construct frame class for sending.
/*
* Heartbeat class constructor.
* @detailed Construct frame class for sending.
*/
Heartbeat();
@ -415,7 +415,7 @@ namespace QAMQP
}
}
Q_DECLARE_METATYPE(QAMQP::Frame::decimal);
Q_DECLARE_METATYPE(QAMQP::Frame::TableField);
Q_DECLARE_METATYPE(QAMQP::Frame::decimal)
Q_DECLARE_METATYPE(QAMQP::Frame::TableField)
#endif // amqp_frame_h__

View File

@ -1,8 +1,6 @@
#ifndef qamqp_global_h__
#define qamqp_global_h__
#include <QtCore/qglobal.h>
#define QAMQP_P_INCLUDE
#define AMQPSCHEME "amqp"
#define AMQPSSCHEME "amqps"
@ -15,23 +13,6 @@
#define QAMQP_VERSION "0.2.0"
#define AMQP_CONNECTION_FORCED 320
#define P_DECLARE_PRIVATE(Class) \
friend class Class##Private; \
inline Class##Private* pd_func() { return reinterpret_cast<Class##Private *>(this->pd_ptr); } \
inline const Class##Private* pd_func() const { return reinterpret_cast<const Class##Private *>(this->pd_ptr); }
#define P_DECLARE_PUBLIC(Class) \
inline Class* pq_func() { return static_cast<Class *>(this->pq_ptr); } \
inline const Class* pq_func() const { return static_cast<const Class *>(this->pq_ptr); } \
friend class Class;
#define P_D(Class) Class##Private * const d = this->pd_func()
#define P_Q(Class) Class * const q = this->pq_func()
#endif // qamqp_global_h__

View File

@ -0,0 +1,15 @@
#include "amqp_message.h"
using namespace QAMQP;
Message::Message()
{
qDebug("Message create");
leftSize = 0;
deliveryTag = 0;
}
Message::~Message()
{
qDebug("Message release");
}

View File

@ -5,20 +5,14 @@
namespace QAMQP
{
struct Message
{
Message()
{
qDebug("Message create");
leftSize = 0;
deliveryTag = 0;
}
~Message()
{
qDebug("Message release");
}
Message();
virtual ~Message();
typedef QAMQP::Frame::Content::Property MessageProperty;
Q_DECLARE_FLAGS(MessageProperties, MessageProperty);
Q_DECLARE_FLAGS(MessageProperties, MessageProperty)
qlonglong deliveryTag;
QByteArray payload;
@ -30,4 +24,5 @@ namespace QAMQP
};
typedef QSharedPointer<QAMQP::Message> MessagePtr;
}

View File

@ -3,7 +3,8 @@
#include <QTimer>
#include <QtEndian>
QAMQP::Network::Network( QObject * parent /*= 0*/ ):QObject(parent)
QAMQP::Network::Network( QObject * parent)
: QObject(parent)
{
qRegisterMetaType<QAMQP::Frame::Method>("QAMQP::Frame::Method");
@ -19,13 +20,13 @@ QAMQP::Network::~Network()
disconnect();
}
void QAMQP::Network::connectTo( const QString & host, quint32 port )
{
if(!socket_)
void QAMQP::Network::connectTo(const QString & host, quint16 port)
{
if(!socket_) {
qWarning("AMQP: Socket didn't create.");
return;
}
QString h(host);
int p(port);
connect_ = true;
@ -34,8 +35,7 @@ void QAMQP::Network::connectTo( const QString & host, quint32 port )
if (port == 0)
p = lastPort_;
if (isSsl())
{
if (isSsl()) {
#ifndef QT_NO_SSL
static_cast<QSslSocket *>(socket_.data())->connectToHostEncrypted(h, p);
#else
@ -58,19 +58,14 @@ void QAMQP::Network::disconnect()
void QAMQP::Network::error(QAbstractSocket::SocketError socketError)
{
if(timeOut_ == 0)
{
if (timeOut_ == 0) {
timeOut_ = 1000;
} else {
if (timeOut_ < 120000)
{
timeOut_ *= 5;
}
}
Q_UNUSED(socketError);
switch(socketError)
{
switch(socketError) {
case QAbstractSocket::ConnectionRefusedError:
case QAbstractSocket::RemoteHostClosedError:
case QAbstractSocket::SocketTimeoutError:
@ -85,80 +80,60 @@ void QAMQP::Network::error( QAbstractSocket::SocketError socketError )
}
if (autoReconnect_ && connect_)
{
QTimer::singleShot(timeOut_, this, SLOT(connectTo()));
}
}
void QAMQP::Network::readyRead()
{
while(socket_->bytesAvailable() >= Frame::HEADER_SIZE)
{
while (socket_->bytesAvailable() >= Frame::HEADER_SIZE) {
char *headerData = buffer_.data();
socket_->peek(headerData, Frame::HEADER_SIZE);
const quint32 payloadSize = qFromBigEndian<quint32>(*(quint32*)&headerData[3]);
const qint64 readSize = Frame::HEADER_SIZE+payloadSize+Frame::FRAME_END_SIZE;
if(socket_->bytesAvailable() >= readSize)
{
if (socket_->bytesAvailable() >= readSize) {
buffer_.resize(readSize);
socket_->read(buffer_.data(), readSize);
const char* bufferData = buffer_.constData();
const quint8 type = *(quint8*)&bufferData[0];
const quint8 magic = *(quint8*)&bufferData[Frame::HEADER_SIZE+payloadSize];
if (magic != QAMQP::Frame::FRAME_END)
{
qWarning() << "Wrong end frame";
}
QDataStream streamB(&buffer_, QIODevice::ReadOnly);
switch(QAMQP::Frame::Type(type))
{
switch(QAMQP::Frame::Type(type)) {
case QAMQP::Frame::ftMethod:
{
QAMQP::Frame::Method frame(streamB);
if(frame.methodClass() == QAMQP::Frame::fcConnection)
{
if (frame.methodClass() == QAMQP::Frame::fcConnection) {
m_pMethodHandlerConnection->_q_method(frame);
}
else
{
} else {
foreach(Frame::MethodHandler* pMethodHandler, m_methodHandlersByChannel[frame.channel()])
{
pMethodHandler->_q_method(frame);
}
}
}
break;
case QAMQP::Frame::ftHeader:
{
QAMQP::Frame::Content frame(streamB);
foreach(Frame::ContentHandler* pMethodHandler, m_contentHandlerByChannel[frame.channel()])
{
pMethodHandler->_q_content(frame);
}
}
break;
case QAMQP::Frame::ftBody:
{
QAMQP::Frame::ContentBody frame(streamB);
foreach(Frame::ContentBodyHandler* pMethodHandler, m_bodyHandlersByChannel[frame.channel()])
{
pMethodHandler->_q_body(frame);
}
}
break;
case QAMQP::Frame::ftHeartbeat:
{
qDebug("AMQP: Heartbeat");
}
break;
default:
qWarning() << "AMQP: Unknown frame type: " << type;
}
}
else
{
} else {
break;
}
}
@ -166,8 +141,7 @@ void QAMQP::Network::readyRead()
void QAMQP::Network::sendFrame(const QAMQP::Frame::Base &frame)
{
if(socket_->state() == QAbstractSocket::ConnectedState)
{
if (socket_->state() == QAbstractSocket::ConnectedState) {
QDataStream stream(socket_);
frame.toStream(stream);
}
@ -176,9 +150,7 @@ void QAMQP::Network::sendFrame( const QAMQP::Frame::Base & frame )
bool QAMQP::Network::isSsl() const
{
if (socket_)
{
return QString(socket_->metaObject()->className()).compare("QSslSocket", Qt::CaseInsensitive) == 0;
}
return false;
}
@ -187,19 +159,19 @@ void QAMQP::Network::setSsl( bool value )
initSocket(value);
}
void QAMQP::Network::initSocket( bool ssl /*= false*/ )
void QAMQP::Network::initSocket(bool ssl)
{
if(socket_)
delete socket_;
if (socket_) {
socket_->deleteLater();
socket_ = 0;
}
if(ssl)
{
if (ssl) {
#ifndef QT_NO_SSL
socket_ = new QSslSocket(this);
QSslSocket * ssl_= static_cast<QSslSocket*> (socket_.data());
ssl_->setProtocol(QSsl::AnyProtocol);
connect(socket_, SIGNAL(sslErrors(const QList<QSslError> &)), this, SLOT(sslErrors()));
connect(socket_, SIGNAL(connected()), this, SLOT(conectionReady()));
#else
qWarning("AMQP: You library has builded with QT_NO_SSL option.");
@ -209,15 +181,14 @@ void QAMQP::Network::initSocket( bool ssl /*= false*/ )
connect(socket_, SIGNAL(connected()), this, SLOT(conectionReady()));
}
if(socket_)
{
if (socket_) {
connect(socket_, SIGNAL(disconnected()), this, SIGNAL(disconnected()));
connect(socket_, SIGNAL(readyRead()), this, SLOT(readyRead()));
connect(socket_, SIGNAL(error(QAbstractSocket::SocketError)), this, SLOT(error(QAbstractSocket::SocketError)));
connect(socket_, SIGNAL(error(QAbstractSocket::SocketError)),
this, SLOT(error(QAbstractSocket::SocketError)));
}
}
void QAMQP::Network::sslErrors()
{
#ifndef QT_NO_SSL
@ -225,11 +196,11 @@ void QAMQP::Network::sslErrors( )
#endif
}
void QAMQP::Network::conectionReady()
{
emit connected();
Q_EMIT connected();
timeOut_ = 0;
char header_[8] = {'A', 'M', 'Q', 'P', 0,0,9,1};
socket_->write(header_, 8);
}
@ -247,14 +218,10 @@ void QAMQP::Network::setAutoReconnect( bool value )
QAbstractSocket::SocketState QAMQP::Network::state() const
{
if (socket_)
{
return socket_->state();
} else {
return QAbstractSocket::UnconnectedState;
}
}
void QAMQP::Network::setMethodHandlerConnection(Frame::MethodHandler *pMethodHandlerConnection)
{
m_pMethodHandlerConnection = pMethodHandlerConnection;

View File

@ -13,13 +13,11 @@
namespace QAMQP
{
class Network : public QObject
{
Q_OBJECT
Q_DISABLE_COPY(Network)
public:
typedef qint16 Channel;
Network(QObject *parent = 0);
~Network();
@ -36,28 +34,28 @@ namespace QAMQP
QAbstractSocket::SocketState state() const;
typedef qint16 Channel;
void setMethodHandlerConnection(Frame::MethodHandler *pMethodHandlerConnection);
void addMethodHandlerForChannel(Channel channel, Frame::MethodHandler *pHandler);
void addContentHandlerForChannel(Channel channel, Frame::ContentHandler *pHandler);
void addContentBodyHandlerForChannel(Channel channel, Frame::ContentBodyHandler *pHandler);
public slots:
void connectTo(const QString & host = QString(), quint32 port = 0);
void connectTo(const QString &host = QString(), quint16 port = 0);
void error(QAbstractSocket::SocketError socketError);
signals:
Q_SIGNALS:
void connected();
void disconnected();
private slots:
private Q_SLOTS:
void readyRead();
void sslErrors();
void conectionReady();
private:
Q_DISABLE_COPY(Network)
void initSocket(bool ssl = false);
QPointer<QTcpSocket> socket_;
QByteArray buffer_;
@ -73,5 +71,6 @@ namespace QAMQP
QHash<Channel, QList<Frame::ContentHandler*> > m_contentHandlerByChannel;
QHash<Channel, QList<Frame::ContentBodyHandler*> > m_bodyHandlersByChannel;
};
}
#endif // amqp_network_h__

View File

@ -1,5 +1,5 @@
#ifndef qamqp_amqp_p_h__
#define qamqp_amqp_p_h__
#ifndef QAMQP_P_H
#define QAMQP_P_H
#include <QSharedPointer>
@ -10,21 +10,22 @@
namespace QAMQP
{
class Queue;
class Exchange;
class ClientPrivate
{
P_DECLARE_PUBLIC(QAMQP::Client)
public:
ClientPrivate(Client *q);
~ClientPrivate();
void init(QObject *parent);
void init(QObject *parent, const QUrl &connectionString);
void printConnect() const;
void connect();
void disconnect();
void parseCnnString( const QUrl & connectionString);
void parseConnectionString( const QUrl &connectionString);
void sockConnect();
void login();
void setAuth(Authenticator* auth);
@ -41,9 +42,8 @@ namespace QAMQP
bool isSSl() const;
Client * const pq_ptr;
Client * const q_ptr;
Q_DECLARE_PUBLIC(QAMQP::Client)
};

View File

@ -10,34 +10,11 @@ using namespace QAMQP::Frame;
#include <QDataStream>
#include <QFile>
namespace QAMQP
Queue::Queue(int channelNumber, Client *parent)
: Channel(new QueuePrivate(this), parent)
{
struct QueueExceptionCleaner
{
/* this cleans up when the constructor throws an exception */
static inline void cleanup(Queue *that, QueuePrivate *d)
{
#ifdef QT_NO_EXCEPTIONS
Q_UNUSED(that);
Q_UNUSED(d);
#else
Q_UNUSED(that);
Q_UNUSED(d);
#endif
}
};
}
Queue::Queue( int channelNumber, Client * parent /*= 0*/ )
: Channel(new QueuePrivate(this))
{
QT_TRY {
pd_func()->init(channelNumber, parent);
} QT_CATCH(...) {
QueueExceptionCleaner::cleanup(this, pd_func());
QT_RETHROW;
}
Q_D(QAMQP::Queue);
d->init(channelNumber, parent);
}
Queue::~Queue()
@ -47,169 +24,176 @@ Queue::~Queue()
void Queue::onOpen()
{
P_D(Queue);
Q_D(QAMQP::Queue);
if (d->delayedDeclare)
{
d->declare();
}
if(!d->delayedBindings.isEmpty())
{
if (!d->delayedBindings.isEmpty()) {
typedef QPair<QString, QString> BindingPair;
foreach(BindingPair binding, d->delayedBindings)
{
d->bind(binding.first, binding.second);
}
d->delayedBindings.clear();
}
}
void Queue::onClose()
{
pd_func()->remove(true, true);
Q_D(QAMQP::Queue);
d->remove(true, true);
}
Queue::QueueOptions Queue::option() const
{
return pd_func()->options;
Q_D(const QAMQP::Queue);
return d->options;
}
void Queue::setNoAck(bool noAck)
{
pd_func()->noAck = noAck;
Q_D(QAMQP::Queue);
d->noAck = noAck;
}
bool Queue::noAck() const
{
return pd_func()->noAck;
Q_D(const QAMQP::Queue);
return d->noAck;
}
void Queue::declare()
{
P_D(Queue);
Q_D(QAMQP::Queue);
declare(d->name, QueueOptions(Durable | AutoDelete));
}
void Queue::declare(const QString &name, QueueOptions options)
{
P_D(Queue);
Q_D(QAMQP::Queue);
setName(name);
d->options = options;
d->declare();
}
void Queue::remove( bool ifUnused /*= true*/, bool ifEmpty /*= true*/, bool noWait /*= true*/ )
void Queue::remove(bool ifUnused, bool ifEmpty, bool noWait)
{
pd_func()->remove(ifUnused, ifEmpty, noWait);
Q_D(QAMQP::Queue);
d->remove(ifUnused, ifEmpty, noWait);
}
void Queue::purge()
{
pd_func()->purge();
Q_D(QAMQP::Queue);
d->purge();
}
void Queue::bind(const QString &exchangeName, const QString &key)
{
pd_func()->bind(exchangeName, key);
Q_D(QAMQP::Queue);
d->bind(exchangeName, key);
}
void Queue::bind(Exchange *exchange, const QString &key)
{
Q_D(QAMQP::Queue);
if (exchange)
pd_func()->bind(exchange->name(), key);
d->bind(exchange->name(), key);
}
void Queue::unbind(const QString &exchangeName, const QString &key)
{
pd_func()->unbind(exchangeName, key);
Q_D(QAMQP::Queue);
d->unbind(exchangeName, key);
}
void Queue::unbind(Exchange *exchange, const QString &key)
{
Q_D(QAMQP::Queue);
if (exchange)
pd_func()->unbind(exchange->name(), key);
d->unbind(exchange->name(), key);
}
void Queue::_q_content(const Content &frame)
{
pd_func()->_q_content(frame);
Q_D(QAMQP::Queue);
d->_q_content(frame);
}
void Queue::_q_body(const ContentBody &frame)
{
pd_func()->_q_body(frame);
Q_D(QAMQP::Queue);
d->_q_body(frame);
}
QAMQP::MessagePtr Queue::getMessage()
{
return pd_func()->messages_.dequeue();
Q_D(QAMQP::Queue);
return d->messages_.dequeue();
}
bool Queue::hasMessage() const
{
if(pd_func()->messages_.isEmpty())
{
Q_D(const QAMQP::Queue);
if (d->messages_.isEmpty())
return false;
}
const MessagePtr &q = pd_func()->messages_.head();
const MessagePtr &q = d->messages_.head();
return q->leftSize == 0;
}
void Queue::consume(ConsumeOptions options)
{
pd_func()->consume(options);
Q_D(QAMQP::Queue);
d->consume(options);
}
void Queue::setConsumerTag(const QString &consumerTag)
{
pd_func()->consumerTag = consumerTag;
Q_D(QAMQP::Queue);
d->consumerTag = consumerTag;
}
QString Queue::consumerTag() const
{
return pd_func()->consumerTag;
Q_D(const QAMQP::Queue);
return d->consumerTag;
}
void Queue::get()
{
pd_func()->get();
Q_D(QAMQP::Queue);
d->get();
}
void Queue::ack(const MessagePtr &message)
{
pd_func()->ack(message);
Q_D(QAMQP::Queue);
d->ack(message);
}
//////////////////////////////////////////////////////////////////////////
QueuePrivate::QueuePrivate(Queue * q)
:ChannelPrivate(q)
, delayedDeclare(false)
, declared(false)
, noAck(true)
, recievingMessage(false)
: ChannelPrivate(q),
delayedDeclare(false),
declared(false),
noAck(true),
recievingMessage(false)
{
}
QueuePrivate::~QueuePrivate()
{
}
bool QueuePrivate::_q_method(const QAMQP::Frame::Method &frame)
{
Q_Q(QAMQP::Queue);
if (ChannelPrivate::_q_method(frame))
return true;
if(frame.methodClass() == QAMQP::Frame::fcQueue)
{
switch(frame.id())
{
if (frame.methodClass() == QAMQP::Frame::fcQueue) {
switch (frame.id()) {
case miDeclareOk:
declareOk(frame);
break;
@ -228,13 +212,12 @@ bool QueuePrivate::_q_method( const QAMQP::Frame::Method & frame )
default:
break;
}
return true;
}
if(frame.methodClass() == QAMQP::Frame::fcBasic)
{
switch(frame.id())
{
if (frame.methodClass() == QAMQP::Frame::fcBasic) {
switch(frame.id()) {
case bmConsumeOk:
consumeOk(frame);
break;
@ -245,7 +228,7 @@ bool QueuePrivate::_q_method( const QAMQP::Frame::Method & frame )
getOk(frame);
break;
case bmGetEmpty:
QMetaObject::invokeMethod(pq_func(), "empty");
QMetaObject::invokeMethod(q, "empty");
break;
default:
break;
@ -253,14 +236,12 @@ bool QueuePrivate::_q_method( const QAMQP::Frame::Method & frame )
return true;
}
return false;
}
void QueuePrivate::declareOk(const QAMQP::Frame::Method &frame)
{
Q_Q(QAMQP::Queue);
qDebug() << "Declared queue: " << name;
declared = true;
@ -272,11 +253,12 @@ void QueuePrivate::declareOk( const QAMQP::Frame::Method & frame )
stream >> messageCount >> consumerCount;
qDebug("Message count %d\nConsumer count: %d", messageCount, consumerCount);
QMetaObject::invokeMethod(pq_func(), "declared");
QMetaObject::invokeMethod(q, "declared");
}
void QueuePrivate::deleteOk(const QAMQP::Frame::Method &frame)
{
Q_Q(QAMQP::Queue);
qDebug() << "Deleted or purged queue: " << name;
declared = false;
@ -285,26 +267,30 @@ void QueuePrivate::deleteOk( const QAMQP::Frame::Method & frame )
qint32 messageCount = 0;
stream >> messageCount;
qDebug("Message count %d", messageCount);
QMetaObject::invokeMethod(pq_func(), "removed");
QMetaObject::invokeMethod(q, "removed");
}
void QueuePrivate::bindOk( const QAMQP::Frame::Method & )
void QueuePrivate::bindOk(const QAMQP::Frame::Method &frame)
{
Q_UNUSED(frame)
Q_Q(QAMQP::Queue);
qDebug() << "Binded to queue: " << name;
QMetaObject::invokeMethod(pq_func(), "binded", Q_ARG(bool, true));
QMetaObject::invokeMethod(q, "binded", Q_ARG(bool, true));
}
void QueuePrivate::unbindOk( const QAMQP::Frame::Method & )
void QueuePrivate::unbindOk(const QAMQP::Frame::Method &frame)
{
Q_UNUSED(frame)
Q_Q(QAMQP::Queue);
qDebug() << "Unbinded queue: " << name;
QMetaObject::invokeMethod(pq_func(), "binded", Q_ARG(bool, false));
QMetaObject::invokeMethod(q, "binded", Q_ARG(bool, false));
}
void QueuePrivate::declare()
{
if(!opened)
{
if (!opened) {
delayedDeclare = true;
return;
}
@ -321,11 +307,9 @@ void QueuePrivate::declare()
frame.setArguments(arguments_);
sendFrame(frame);
delayedDeclare = false;
}
void QueuePrivate::remove( bool ifUnused /*= true*/, bool ifEmpty /*= true*/, bool noWait /*= true*/ )
void QueuePrivate::remove(bool ifUnused, bool ifEmpty, bool noWait)
{
if (!declared)
return;
@ -348,15 +332,12 @@ void QueuePrivate::remove( bool ifUnused /*= true*/, bool ifEmpty /*= true*/, bo
frame.setArguments(arguments_);
sendFrame(frame);
}
void QueuePrivate::purge()
{
if (!opened)
{
return;
}
QAMQP::Frame::Method frame(QAMQP::Frame::fcQueue, miPurge);
frame.setChannel(number);
@ -367,13 +348,11 @@ void QueuePrivate::purge()
out << qint8(0); // no-wait
frame.setArguments(arguments_);
sendFrame(frame);
}
void QueuePrivate::bind(const QString & exchangeName, const QString &key)
{
if(!opened)
{
if (!opened) {
delayedBindings.append(QPair<QString,QString>(exchangeName, key));
return;
}
@ -391,15 +370,12 @@ void QueuePrivate::bind( const QString & exchangeName, const QString & key )
frame.setArguments(arguments_);
sendFrame(frame);
}
void QueuePrivate::unbind(const QString &exchangeName, const QString &key)
{
if (!opened)
{
return;
}
QAMQP::Frame::Method frame(QAMQP::Frame::fcQueue, miUnbind);
frame.setChannel(number);
@ -413,16 +389,12 @@ void QueuePrivate::unbind( const QString & exchangeName, const QString & key )
frame.setArguments(arguments_);
sendFrame(frame);
}
void QueuePrivate::get()
{
if (!opened)
{
return;
}
QAMQP::Frame::Method frame(QAMQP::Frame::fcBasic, bmGet);
frame.setChannel(number);
@ -436,7 +408,6 @@ void QueuePrivate::get()
sendFrame(frame);
}
void QueuePrivate::getOk(const QAMQP::Frame::Method &frame)
{
QByteArray data = frame.arguments();
@ -456,13 +427,10 @@ void QueuePrivate::getOk( const QAMQP::Frame::Method & frame )
messages_.enqueue(newMessage);
}
void QueuePrivate::ack(const MessagePtr &Message)
{
if (!opened)
{
return;
}
QAMQP::Frame::Method frame(QAMQP::Frame::fcBasic, bmAck);
frame.setChannel(number);
@ -478,9 +446,7 @@ void QueuePrivate::ack( const MessagePtr & Message )
void QueuePrivate::consume(Queue::ConsumeOptions options)
{
if (!opened)
{
return;
}
QAMQP::Frame::Method frame(QAMQP::Frame::fcBasic, bmConsume);
frame.setChannel(number);
@ -494,13 +460,10 @@ void QueuePrivate::consume( Queue::ConsumeOptions options )
frame.setArguments(arguments_);
sendFrame(frame);
}
void QueuePrivate::consumeOk(const QAMQP::Frame::Method &frame)
{
qDebug() << "Consume ok: " << name;
declared = false;
@ -510,17 +473,13 @@ void QueuePrivate::consumeOk( const QAMQP::Frame::Method & frame )
qDebug("Consumer tag = %s", qPrintable(consumerTag));
}
void QueuePrivate::deliver(const QAMQP::Frame::Method &frame)
{
QByteArray data = frame.arguments();
QDataStream in(&data, QIODevice::ReadOnly);
QString consumer_ = readField('s',in).toString();
if (consumer_ != consumerTag)
{
return;
}
qlonglong deliveryTag = readField('L',in).toLongLong();
bool redelivered = readField('t',in).toBool();
@ -534,7 +493,6 @@ void QueuePrivate::deliver( const QAMQP::Frame::Method & frame )
newMessage->exchangeName = exchangeName;
newMessage->deliveryTag = deliveryTag;
messages_.enqueue(newMessage);
}
void QueuePrivate::_q_content(const QAMQP::Frame::Content &frame)
@ -542,37 +500,35 @@ void QueuePrivate::_q_content( const QAMQP::Frame::Content & frame )
Q_ASSERT(frame.channel() == number);
if (frame.channel() != number)
return;
if(messages_.isEmpty())
{
if (messages_.isEmpty()) {
qErrnoWarning("Received content-header without method frame before");
return;
}
MessagePtr &message = messages_.last();
message->leftSize = frame.bodySize();
QHash<int, QVariant>::ConstIterator i;
for (i = frame.properties_.begin(); i != frame.properties_.end(); ++i)
{
message->property[Message::MessageProperty(i.key())]= i.value();
}
}
void QueuePrivate::_q_body(const QAMQP::Frame::ContentBody &frame)
{
Q_Q(QAMQP::Queue);
Q_ASSERT(frame.channel() == number);
if (frame.channel() != number)
return;
if(messages_.isEmpty())
{
if (messages_.isEmpty()) {
qErrnoWarning("Received content-body without method frame before");
return;
}
MessagePtr &message = messages_.last();
message->payload.append(frame.body());
message->leftSize -= frame.body().size();
if (message->leftSize == 0 && messages_.size() == 1)
{
emit pq_func()->messageReceived(pq_func());
}
Q_EMIT q->messageReceived(q);
}

View File

@ -6,6 +6,7 @@
namespace QAMQP
{
class Client;
class ClientPrivate;
class Exchange;
@ -13,20 +14,11 @@ namespace QAMQP
class Queue : public Channel, public Frame::ContentHandler, public Frame::ContentBodyHandler
{
Q_OBJECT
Queue(int channelNumber = -1, Client * parent = 0);
Q_PROPERTY(QueueOptions option READ option );
Q_ENUMS(QueueOptions)
Q_PROPERTY(QueueOptions option READ option)
Q_PROPERTY(QString consumerTag READ consumerTag WRITE setConsumerTag)
Q_PROPERTY(bool noAck READ noAck WRITE setNoAck)
P_DECLARE_PRIVATE(QAMQP::Queue)
Q_DISABLE_COPY(Queue);
friend class ClientPrivate;
protected:
void onOpen();
void onClose();
public:
enum QueueOption {
NoOptions = 0x0,
@ -47,7 +39,6 @@ namespace QAMQP
Q_DECLARE_FLAGS(ConsumeOptions, ConsumeOption)
~Queue();
QueueOptions option() const;
void declare();
@ -80,10 +71,23 @@ namespace QAMQP
void messageReceived(QAMQP::Queue *pQueue);
void empty();
protected:
void onOpen();
void onClose();
private:
Queue(int channelNumber = -1, Client * parent = 0);
void _q_content(const QAMQP::Frame::Content & frame);
void _q_body(const QAMQP::Frame::ContentBody & frame);
Q_DISABLE_COPY(Queue)
Q_DECLARE_PRIVATE(QAMQP::Queue)
friend class ClientPrivate;
};
}
#ifdef QAMQP_P_INCLUDE
# include "amqp_queue_p.h"

View File

@ -7,14 +7,13 @@
namespace QAMQP
{
using namespace QAMQP::Frame;
class QueuePrivate: public ChannelPrivate
{
P_DECLARE_PUBLIC(QAMQP::Queue)
public:
enum MethodId
{
enum MethodId {
METHOD_ID_ENUM(miDeclare, 10),
METHOD_ID_ENUM(miBind, 20),
METHOD_ID_ENUM(miUnbind, 50),
@ -65,6 +64,10 @@ namespace QAMQP
void _q_content(const QAMQP::Frame::Content &frame);
void _q_body(const QAMQP::Frame::ContentBody &frame);
Q_DECLARE_PUBLIC(QAMQP::Queue)
};
}
#endif // amqp_queue_p_h__

View File

@ -24,5 +24,6 @@ SOURCES += $$PWD/amqp.cpp \
$$PWD/amqp_connection.cpp \
$$PWD/amqp_exchange.cpp \
$$PWD/amqp_frame.cpp \
$$PWD/amqp_message.cpp \
$$PWD/amqp_network.cpp \
$$PWD/amqp_queue.cpp \