[+] Exchange

[+] Queue
[+] Basic.publish
This commit is contained in:
fuCtor 2012-02-12 04:22:10 -08:00
parent 1945970929
commit 3fbb01fef4
30 changed files with 1337 additions and 550 deletions

View File

@ -1,233 +0,0 @@
/*
* QAMQP.h
* libqamqp
*
* Created by Alexey Shcherbakov on 28.01.2012.
*
*/
#ifndef QAMQP_h__
#define QAMQP_h__
#define AMQPDEBUG ":5673"
#define AMQP_AUTODELETE 1
#define AMQP_DURABLE 2
#define AMQP_PASSIVE 4
#define AMQP_MANDATORY 8
#define AMQP_IMMIDIATE 16
#define AMQP_IFUNUSED 32
#define AMQP_EXCLUSIVE 64
#define AMQP_NOWAIT 128
#define AMQP_NOACK 256
#define AMQP_NOLOCAL 512
#define AMQP_MULTIPLE 1024
#define HEADER_FOOTER_SIZE 8 // 7 bytes up front, then payload, then 1 byte footer
// max lenght (size) of frame
#include <QString>
#include <QVector>
#include <QMap>
//export AMQP;
namespace QAMQP
{
class AMQPQueue;
enum AMQPEvents_e {
AMQP_MESSAGE, AMQP_SIGUSR, AMQP_CANCEL, AMQP_CLOSE_CHANNEL
};
class AMQPException {
string message;
int code;
public:
AMQPException(string message);
AMQPException(amqp_rpc_reply_t * res);
string getMessage();
uint16_t getReplyCode();
};
class AMQPMessage {
char * data;
string exchange;
string routing_key;
uint32_t delivery_tag;
int message_count;
string consumer_tag;
AMQPQueue * queue;
map<string,string> headers;
public :
AMQPMessage(AMQPQueue * queue);
~AMQPMessage();
void setMessage(const char * data);
char * getMessage();
void addHeader(string name, amqp_bytes_t * value);
void addHeader(string name, uint64_t * value);
void addHeader(string name, uint8_t * value);
string getHeader(string name);
void setConsumerTag( amqp_bytes_t consumer_tag);
void setConsumerTag( string consumer_tag);
string getConsumerTag();
void setMessageCount(int count);
int getMessageCount();
void setExchange(amqp_bytes_t exchange);
void setExchange(string exchange);
string getExchange();
void setRoutingKey(amqp_bytes_t routing_key);
void setRoutingKey(string routing_key);
string getRoutingKey();
uint32_t getDeliveryTag();
void setDeliveryTag(uint32_t delivery_tag);
AMQPQueue * getQueue();
};
class AMQPBase {
protected:
string name;
short parms;
amqp_connection_state_t * cnn;
int channelNum;
AMQPMessage * pmessage;
short opened;
void checkReply(amqp_rpc_reply_t * res);
void checkClosed(amqp_rpc_reply_t * res);
void openChannel();
public:
~AMQPBase();
int getChannelNum();
void setParam(short param);
string getName();
void closeChannel();
void reopen();
void setName(const char * name);
void setName(string name);
};
class AMQPQueue : public AMQPBase {
protected:
map< AMQPEvents_e, int(*)( AMQPMessage * ) > events;
amqp_bytes_t consumer_tag;
uint32_t delivery_tag;
uint32_t count;
public:
AMQPQueue(amqp_connection_state_t * cnn, int channelNum);
AMQPQueue(amqp_connection_state_t * cnn, int channelNum, string name);
void Declare();
void Declare(string name);
void Declare(string name, short parms);
void Delete();
void Delete(string name);
void Purge();
void Purge(string name);
void Bind(string exchangeName, string key);
void unBind(string exchangeName, string key);
void Get();
void Get(short param);
void Consume();
void Consume(short param);
void Cancel(amqp_bytes_t consumer_tag);
void Cancel(string consumer_tag);
void Ack();
void Ack(uint32_t delivery_tag);
AMQPMessage * getMessage() {
return pmessage;
}
uint32_t getCount() {
return count;
}
void setConsumerTag(string consumer_tag);
amqp_bytes_t getConsumerTag();
void addEvent( AMQPEvents_e eventType, int (*event)(AMQPMessage*) );
~AMQPQueue();
private:
void sendDeclareCommand();
void sendDeleteCommand();
void sendPurgeCommand();
void sendBindCommand(const char * exchange, const char * key);
void sendUnBindCommand(const char * exchange, const char * key);
void sendGetCommand();
void sendConsumeCommand();
void sendCancelCommand();
void sendAckCommand();
void setHeaders(amqp_basic_properties_t * p);
};
class AMQPExchange : public AMQPBase {
string type;
map<string,string> sHeaders;
map<string, int> iHeaders;
public:
AMQPExchange(amqp_connection_state_t * cnn, int channelNum);
AMQPExchange(amqp_connection_state_t * cnn, int channelNum, string name);
void Declare();
void Declare(string name);
void Declare(string name, string type);
void Declare(string name, string type, short parms);
void Delete();
void Delete(string name);
void Bind(string queueName);
void Bind(string queueName, string key);
void Publish(string message, string key);
void setHeader(string name, int value);
void setHeader(string name, string value);
private:
AMQPExchange();
void checkType();
void sendDeclareCommand();
void sendDeleteCommand();
void sendPublishCommand();
void sendBindCommand(const char * queueName, const char * key);
void sendPublishCommand(const char * message, const char * key);
void sendCommand();
void checkReply(amqp_rpc_reply_t * res);
void checkClosed(amqp_rpc_reply_t * res);
};
}
#endif // QAMQP_h__

View File

@ -1,18 +0,0 @@
#ifndef amqp_exchange_h__
#define amqp_exchange_h__
#include "amqp_channel.h"
namespace QAMQP
{
class Client;
class ClientPrivate;
class Exchange : public Channel
{
Q_OBJECT;
Exchange(Client * parent = 0) : Channel(parent) {}
public:
friend class ClientPrivate;
~Exchange(){}
};
}
#endif // amqp_exchange_h__

View File

@ -1,122 +0,0 @@
#ifndef amqp_frame_h__
#define amqp_frame_h__
#include <QDataStream>
#include <QHash>
#include <QVariant>
#include <QSharedPointer>
namespace QAMQP
{
namespace Frame
{
enum Type
{
ftMethod = 1,
ftHeader = 2,
ftBody = 3,
ftHeartbeat = 8
};
enum MethodClass
{
fcConnection = 10,
fcChannel = 20,
fcExchange = 40,
fcQueue = 50,
fcBasic = 60,
fcTx = 90,
};
enum FieldValueKind {
fkBoolean = 't',
fkI8 = 'b',
fkU8 = 'B',
fkI16 = 'U',
fkU16 = 'u',
fkI32 = 'I',
fkU32 = 'i',
fkI64 = 'l',
fkU64 = 'L',
fkFloat = 'f',
fkDouble = 'd',
fkDecimal = 'D',
fkLongString = 'S',
fkShortString = 's',
fkArray = 'A',
fkTimestamp = 'T',
fkTable = 'F',
fkVoid = 'V',
fkBytes = 'x'
};
struct decimal
{
qint8 scale;
quint32 value;
};
Q_DECLARE_METATYPE(QAMQP::Frame::decimal);
typedef QHash<QString, QVariant> TableField;
Q_DECLARE_METATYPE(QAMQP::Frame::TableField);
QDataStream & serialize( QDataStream & stream, const QAMQP::Frame::TableField & f );
QDataStream & deserialize( QDataStream & stream, QAMQP::Frame::TableField & f );
QVariant readField( FieldValueKind valueType, QDataStream &s );
void writeField( QDataStream &s, const QVariant & value );
void writeField( FieldValueKind valueType, QDataStream &s, const QVariant & value, bool withType = false );
void print( const QAMQP::Frame::TableField & f );
class Base
{
public:
Base(Type type);
Base(QDataStream& raw);
Type type() const;
void setChannel(qint16 channel);
qint16 channel() const;
virtual qint32 size() const;
void toStream(QDataStream & stream) const;
protected:
void writeHeader(QDataStream & stream) const;
virtual void writePayload(QDataStream & stream) const;
void writeEnd(QDataStream & stream) const;
void readHeader(QDataStream & stream);
virtual void readPayload(QDataStream & stream);
void readEnd(QDataStream & stream);
qint32 size_;
private:
qint8 type_;
qint16 channel_;
};
typedef QSharedPointer<QAMQP::Frame::Base> BasePtr;
class Method : public Base
{
public:
Method();
Method(MethodClass methodClass, qint16 id);
Method(QDataStream& raw);
MethodClass methodClass() const;
qint16 id() const;
qint32 size() const;
void setArguments(const QByteArray & data);
QByteArray arguments() const;
protected:
void writePayload(QDataStream & stream) const;
void readPayload(QDataStream & stream);
short methodClass_;
qint16 id_;
QByteArray arguments_;
};
}
}
#endif // amqp_frame_h__

View File

@ -1,51 +0,0 @@
#include "amqp_private.h"
QAMQP::Base::Base( int version /*= QObjectPrivateVersion*/ )
{
outArguments_ = QSharedPointer<QBuffer>(new QBuffer());
outArguments_->open(QIODevice::WriteOnly);
}
QAMQP::Base::~Base()
{
outArguments_.clear();
inFrame_.clear();
outFrame_.clear();
}
void QAMQP::Base::startMethod( int id )
{
outFrame_ = QAMQP::Frame::BasePtr(new QAMQP::Frame::Method(methodClass(), id));
streamOut_.setDevice(outArguments_.data());
}
void QAMQP::Base::writeArgument( QAMQP::Frame::FieldValueKind type, QVariant value )
{
}
void QAMQP::Base::writeArgument( QVariant value )
{
}
void QAMQP::Base::endWrite()
{
}
void QAMQP::Base::startRead( QAMQP::Frame::BasePtr frame )
{
}
QVariant QAMQP::Base::readArgument( QAMQP::Frame::FieldValueKind type )
{
}
void QAMQP::Base::endRead()
{
}

View File

@ -1,38 +0,0 @@
#ifndef amqp_private_h__
#define amqp_private_h__
#include <QtCore/private/qobject_p.h>
#include <QDataStream>
#include <QBuffer>
#include "amqp_frame.h"
namespace QAMQP
{
class Base: public QObjectPrivate
{
public:
Base(int version = QObjectPrivateVersion);
~Base();
void init(QAMQP::Client * client);
virtual QAMQP::Frame::MethodClass methodClass() const = 0;
void startMethod(int id);
void writeArgument(QAMQP::Frame::FieldValueKind type, QVariant value);
void writeArgument(QVariant value);
void endWrite();
void startRead(QAMQP::Frame::BasePtr frame);
QVariant readArgument(QAMQP::Frame::FieldValueKind type);
void endRead();
private:
QAMQP::Frame::BasePtr inFrame_;
QAMQP::Frame::BasePtr outFrame_;
QDataStream streamIn_;
QDataStream streamOut_;
QSharedPointer<QBuffer> outArguments_;
QPointer<Client> client_;
};
}
#endif // amqp_private_h__

View File

View File

@ -1,18 +1,12 @@
#include <QtCore/QCoreApplication>
#include "amqp.h"
#include "amqp_exchange.h"
#include "test.h"
int main(int argc, char *argv[])
{
QCoreApplication a(argc, argv);
QUrl con(QString("amqp://guest:guest@localhost:5672/"));
QAMQP::Client client(con);
QAMQP::Exchange * exchange_ = client.createExchange(),
*exchange2_ = client.createExchange();
client.printConnect();
Test test;
return a.exec();
}

View File

@ -5,6 +5,7 @@
#include <QCoreApplication>
#include "qamqp_global.h"
#include "amqp_exchange.h"
#include "amqp_queue.h"
using namespace QAMQP;
@ -45,27 +46,24 @@ ClientPrivate::~ClientPrivate()
void ClientPrivate::init(QObject * parent)
{
q_func()->setParent(parent);
network_ = new QAMQP::Network(q_func());
connection_ = new QAMQP::Connection(q_func());
if(!network_){
network_ = new QAMQP::Network(q_func());
}
if(!connection_)
{
connection_ = new QAMQP::Connection(q_func());
}
QObject::connect(network_, SIGNAL(method(const QAMQP::Frame::Method &)),
connection_, SLOT(_q_method(const QAMQP::Frame::Method &)));
ClientPrivate::connect();
}
void ClientPrivate::init(QObject * parent, const QUrl & con)
{
Q_Q(QAMQP::Client);
if(con.scheme() == AMQPSCHEME )
{
q->setPassword(con.password());
q->setUser(con.userName());
q->setPort(con.port());
q->setHost(con.host());
q->setVirtualHost(con.path());
}
{
parseCnnString(con);
init(parent);
ClientPrivate::connect();
}
void ClientPrivate::printConnect() const
@ -84,13 +82,22 @@ void ClientPrivate::connect()
ClientPrivate::login();
}
void ClientPrivate::parseCnnString( const QUrl & connectionString )
void ClientPrivate::parseCnnString( const QUrl & con )
{
Q_Q(QAMQP::Client);
if(con.scheme() == AMQPSCHEME )
{
q->setPassword(con.password());
q->setUser(con.userName());
q->setPort(con.port());
q->setHost(con.host());
q->setVirtualHost(con.path());
}
}
void ClientPrivate::sockConnect()
{
disconnect();
network_->connectTo(host, port);
}
@ -99,21 +106,35 @@ void ClientPrivate::login()
}
Exchange * ClientPrivate::createExchange( const QString &name )
Exchange * ClientPrivate::createExchange(int channelNumber, const QString &name )
{
Exchange * exchange_ = new Exchange(q_func());
Exchange * exchange_ = new Exchange(channelNumber, q_func());
QObject::connect(network_, SIGNAL(method(const QAMQP::Frame::Method &)),
exchange_, SLOT(_q_method(const QAMQP::Frame::Method &)));
QObject::connect(connection_, SIGNAL(connected()), exchange_, SLOT(_q_open()));
exchange_->setName(name);
return exchange_;
}
Queue * ClientPrivate::createQueue( const QString &name )
Queue * ClientPrivate::createQueue(int channelNumber, const QString &name )
{
return 0;
Queue * queue_ = new Queue(channelNumber, q_func());
QObject::connect(network_, SIGNAL(method(const QAMQP::Frame::Method &)),
queue_, SLOT(_q_method(const QAMQP::Frame::Method &)));
QObject::connect(connection_, SIGNAL(connected()), queue_, SLOT(_q_open()));
queue_->setName(name);
return queue_;
}
void ClientPrivate::disconnect()
{
network_->QAMQP::Network::disconnect();
}
//////////////////////////////////////////////////////////////////////////
@ -224,22 +245,44 @@ void QAMQP::Client::closeChannel()
}
Exchange * QAMQP::Client::createExchange()
Exchange * QAMQP::Client::createExchange(int channelNumber)
{
return d_func()->createExchange(QString());
return d_func()->createExchange(channelNumber, QString());
}
Exchange * QAMQP::Client::createExchange( const QString &name )
Exchange * QAMQP::Client::createExchange( const QString &name, int channelNumber )
{
return d_func()->createExchange(name);
return d_func()->createExchange(channelNumber, name);
}
Queue * QAMQP::Client::createQueue()
Queue * QAMQP::Client::createQueue(int channelNumber)
{
return d_func()->createQueue(QString());
return d_func()->createQueue(channelNumber, QString());
}
Queue * QAMQP::Client::createQueue( const QString &name )
Queue * QAMQP::Client::createQueue( const QString &name, int channelNumber )
{
return d_func()->createQueue(name);
return d_func()->createQueue(channelNumber, name);
}
void QAMQP::Client::open()
{
return d_func()->connect();
}
void QAMQP::Client::open( const QUrl & connectionString )
{
d_func()->parseCnnString(connectionString);
open();
}
void QAMQP::Client::close()
{
return d_func()->disconnect();
}
void QAMQP::Client::reopen()
{
return d_func()->connect();
return d_func()->disconnect();
}

View File

@ -33,11 +33,11 @@ namespace QAMQP
void printConnect() const;
void closeChannel();
Exchange * createExchange();
Exchange * createExchange(const QString &name);
Exchange * createExchange(int channelNumber = -1);
Exchange * createExchange(const QString &name, int channelNumber = -1);
Queue * createQueue();
Queue * createQueue(const QString &name);
Queue * createQueue(int channelNumber = -1);
Queue * createQueue(const QString &name, int channelNumber = -1);
quint32 port() const;
void setPort(quint32 port);
@ -54,6 +54,11 @@ namespace QAMQP
QString password() const;
void setPassword(const QString & password);
void open();
void open(const QUrl & connectionString);
void close();
void reopen();
protected:
Client(ClientPrivate &d, QObject* parent, const QUrl & connectionString);

View File

@ -32,11 +32,11 @@ namespace QAMQP
//////////////////////////////////////////////////////////////////////////
QAMQP::Channel::Channel( Client * parent /*= 0*/ )
QAMQP::Channel::Channel(int channelNumber /*= -1*/, Client * parent /*= 0*/ )
: QObject(*new ChannelPrivate, 0)
{
QT_TRY {
d_func()->init(parent);
d_func()->init(channelNumber, parent);
} QT_CATCH(...) {
ChannelExceptionCleaner::cleanup(this, d_func());
QT_RETHROW;
@ -62,12 +62,17 @@ QAMQP::Channel::~Channel()
void QAMQP::Channel::closeChannel()
{
Q_D(Channel);
d->needOpen = true;
if(d->opened)
d->close(0, QString(), 0,0);
}
void QAMQP::Channel::reopen()
{
{
closeChannel();
d_func()->open();
}
QString QAMQP::Channel::name()
@ -108,10 +113,28 @@ void QAMQP::Channel::stateChanged( int state )
break;
}
}
bool QAMQP::Channel::isOpened() const
{
return d_func()->opened;
}
void QAMQP::Channel::onOpen()
{
}
void QAMQP::Channel::onClose()
{
}
//////////////////////////////////////////////////////////////////////////
ChannelPrivate::ChannelPrivate(int version /* = QObjectPrivateVersion */)
:QObjectPrivate(version), number(++nextChannelNumber_)
:QObjectPrivate(version)
, number(0)
, opened(false)
, needOpen(true)
{
}
@ -121,8 +144,11 @@ ChannelPrivate::~ChannelPrivate()
}
void ChannelPrivate::init(Client * parent)
void ChannelPrivate::init(int channelNumber, Client * parent)
{
needOpen = channelNumber == -1 ? true : false;
number = channelNumber == -1 ? ++nextChannelNumber_ : channelNumber;
nextChannelNumber_ = qMax(channelNumber, (nextChannelNumber_ + 1));
q_func()->setParent(parent);
client_ = parent;
}
@ -158,7 +184,6 @@ void ChannelPrivate::_q_method( const QAMQP::Frame::Method & frame )
void ChannelPrivate::_q_open()
{
qDebug("Open channel #%d", number);
open();
}
@ -171,6 +196,12 @@ void ChannelPrivate::sendFrame( const QAMQP::Frame::Base & frame )
void ChannelPrivate::open()
{
if(!needOpen)
return;
if(!client_->d_func()->connection_->isConnected())
return;
qDebug("Open channel #%d", number);
QAMQP::Frame::Method frame(QAMQP::Frame::fcChannel, miOpen);
frame.setChannel(number);
QByteArray arguments_;
@ -245,11 +276,18 @@ void ChannelPrivate::closeOk()
void ChannelPrivate::closeOk( const QAMQP::Frame::Method & frame )
{
q_func()->stateChanged(csClosed);
Q_Q(Channel);
q->stateChanged(csClosed);
q->onClose();
opened = false;
}
void ChannelPrivate::openOk( const QAMQP::Frame::Method & frame )
{
Q_Q(Channel);
qDebug(">> OpenOK");
q_func()->stateChanged(csOpened);
opened = true;
q->stateChanged(csOpened);
q->onOpen();
}

View File

@ -21,23 +21,27 @@ namespace QAMQP
public:
~Channel();
void closeChannel();
void closeChannel();
void reopen();
QString name();
int channelNumber();
void setParam(int param);
int channelNumber();
void setParam(int param);
void setName(const QString &name);
bool isOpened() const;
signals:
void opened();
void closed();
void flowChanged(bool enabled);
protected:
Channel(Client * parent = 0);
Channel(int channelNumber = -1, Client * parent = 0);
Channel(ChannelPrivate &dd, Client* parent);
virtual void onOpen();;
virtual void onClose();;
private:
void stateChanged(int state);

View File

@ -27,10 +27,26 @@ namespace QAMQP
csRunning
};
enum BasicMethod
{
METHOD_ID_ENUM(bmQos, 10),
METHOD_ID_ENUM(bmConsume, 20),
METHOD_ID_ENUM(bmCancel, 30),
bmPublish = 40,
bmReturn = 50,
bmDeliver = 60,
METHOD_ID_ENUM(bmGet, 70),
bmgetEmpty = 72,
bmAck = 80,
bmReject = 90,
bmRecoverAsync = 100,
METHOD_ID_ENUM(bmRecover, 110)
};
ChannelPrivate(int version = QObjectPrivateVersion);
~ChannelPrivate();
void init(Client * parent);
void init(int channelNumber, Client * parent);
void open();
void flow();
@ -46,9 +62,10 @@ namespace QAMQP
void close(const QAMQP::Frame::Method & frame);
void closeOk(const QAMQP::Frame::Method & frame);
void _q_method(const QAMQP::Frame::Method & frame);
virtual void _q_method(const QAMQP::Frame::Method & frame);
void _q_open();
void sendFrame(const QAMQP::Frame::Base & frame);
QPointer<Client> client_;
@ -57,6 +74,8 @@ namespace QAMQP
int number;
static int nextChannelNumber_;
bool opened;
bool needOpen;
};
}
#endif // amqp_channel_p_h__

View File

@ -27,12 +27,13 @@ namespace QAMQP
}
};
}
//////////////////////////////////////////////////////////////////////////
ConnectionPrivate::ConnectionPrivate( int version /*= QObjectPrivateVersion*/ )
:QObjectPrivate(version), closed_(false)
:QObjectPrivate(version), closed_(false), connected(false)
{
}
@ -58,20 +59,18 @@ void ConnectionPrivate::startOk()
clientProperties["version"] = "0.0.1";
clientProperties["platform"] = QString("Qt %1").arg(qVersion());
clientProperties["product"] = "QAMQP";
clientProperties["site"] = "http://vmp.ru";
QAMQP::Frame::serialize(stream, clientProperties);
QAMQP::Frame::writeField(QAMQP::Frame::fkShortString, stream, "AMQPLAIN");
QAMQP::Frame::writeField('s', stream, "AMQPLAIN");
QAMQP::Frame::TableField response;
response["LOGIN"] = client_->user();
response["PASSWORD"] = client_->password();
QAMQP::Frame::serialize(stream, response);
QAMQP::Frame::writeField(QAMQP::Frame::fkShortString, stream, "en_US");
QAMQP::Frame::writeField('s', stream, "en_US");
frame.setArguments(arguments_);
client_->d_func()->network_->sendFrame(frame);
client_->d_func()->network_->sendFrame(frame);
}
void ConnectionPrivate::secureOk()
@ -100,7 +99,7 @@ void ConnectionPrivate::open()
QByteArray arguments_;
QDataStream stream(&arguments_, QIODevice::WriteOnly);
QAMQP::Frame::writeField(QAMQP::Frame::fkShortString,stream, client_->virtualHost());
QAMQP::Frame::writeField('s',stream, client_->virtualHost());
stream << qint8(0);
stream << qint8(0);
@ -117,14 +116,13 @@ void ConnectionPrivate::start( const QAMQP::Frame::Method & frame )
quint8 version_major = 0;
quint8 version_minor = 0;
stream >> version_major;
stream >> version_minor;
stream >> version_major >> version_minor;
QAMQP::Frame::TableField table;
QAMQP::Frame::deserialize(stream, table);
QString mechanisms = QAMQP::Frame::readField(QAMQP::Frame::fkLongString, stream).toString();
QString locales = QAMQP::Frame::readField(QAMQP::Frame::fkLongString, stream).toString();
QString mechanisms = QAMQP::Frame::readField('S', stream).toString();
QString locales = QAMQP::Frame::readField('S', stream).toString();
qDebug(">> version_major: %d", version_major);
qDebug(">> version_minor: %d", version_minor);
@ -166,6 +164,7 @@ void ConnectionPrivate::tune( const QAMQP::Frame::Method & frame )
void ConnectionPrivate::openOk( const QAMQP::Frame::Method & frame )
{
qDebug(">> OpenOK");
connected = true;
q_func()->openOk();
}
@ -176,7 +175,7 @@ void ConnectionPrivate::close( const QAMQP::Frame::Method & frame )
QDataStream stream(&data, QIODevice::ReadOnly);
qint16 code_ = 0, classId, methodId;
stream >> code_;
QString text(QAMQP::Frame::readField(QAMQP::Frame::fkShortString, stream).toString());
QString text(QAMQP::Frame::readField('s', stream).toString());
stream >> classId;
stream >> methodId;
@ -184,6 +183,7 @@ void ConnectionPrivate::close( const QAMQP::Frame::Method & frame )
qDebug(">> text: %s", qPrintable(text));
qDebug(">> class-id: %d", classId);
qDebug(">> method-id: %d", methodId);
connected = false;
}
void ConnectionPrivate::close(int code, const QString & text, int classId, int methodId)
@ -192,10 +192,10 @@ void ConnectionPrivate::close(int code, const QString & text, int classId, int m
QByteArray arguments_;
QDataStream stream(&arguments_, QIODevice::WriteOnly);
QAMQP::Frame::writeField(QAMQP::Frame::fkShortString,stream, client_->virtualHost());
QAMQP::Frame::writeField('s',stream, client_->virtualHost());
stream << qint16(code);
QAMQP::Frame::writeField(QAMQP::Frame::fkShortString, stream, text);
QAMQP::Frame::writeField('s', stream, text);
stream << qint16(classId);
stream << qint16(methodId);
@ -205,12 +205,14 @@ void ConnectionPrivate::close(int code, const QString & text, int classId, int m
void ConnectionPrivate::closeOk()
{
QAMQP::Frame::Method frame(QAMQP::Frame::fcConnection, miCloseOk);
QAMQP::Frame::Method frame(QAMQP::Frame::fcConnection, miCloseOk);
connected = false;
client_->d_func()->network_->sendFrame(frame);
}
void ConnectionPrivate::closeOk( const QAMQP::Frame::Method & )
{
connected = false;
QMetaObject::invokeMethod(q_func(), "disconnected");
}
@ -325,4 +327,9 @@ void Connection::closeOk()
void Connection::openOk()
{
emit connected();
}
bool Connection::isConnected() const
{
return d_func()->connected;
}

View File

@ -25,6 +25,9 @@ namespace QAMQP
void open();
void close(int code, const QString & text, int classId = 0, int methodId = 0);
void closeOk();
bool isConnected() const;
Q_SIGNALS:
void disconnected();
void connected();

View File

@ -41,6 +41,7 @@ namespace QAMQP
void _q_method(const QAMQP::Frame::Method & frame);
QPointer<Client> client_;
bool closed_;
bool connected;
};
}
#endif // amqp_connection_p_h__

235
src/qamqp/amqp_exchange.cpp Normal file
View File

@ -0,0 +1,235 @@
#include "amqp_exchange.h"
#include "amqp_exchange_p.h"
#include "amqp_queue.h"
using namespace QAMQP;
using namespace QAMQP::Frame;
#include <QCoreApplication>
#include <QDebug>
#include <QDataStream>
namespace QAMQP
{
struct ExchangeExceptionCleaner
{
/* this cleans up when the constructor throws an exception */
static inline void cleanup(Exchange *that, ExchangePrivate *d)
{
#ifdef QT_NO_EXCEPTIONS
Q_UNUSED(that);
Q_UNUSED(d);
#else
Q_UNUSED(that);
Q_UNUSED(d);
#endif
}
};
}
Exchange::Exchange(int channelNumber, Client * parent /*= 0*/ )
: Channel(*new ExchangePrivate, 0)
{
QT_TRY {
d_func()->init(channelNumber, parent);
} QT_CATCH(...) {
ExchangeExceptionCleaner::cleanup(this, d_func());
QT_RETHROW;
}
}
Exchange::~Exchange()
{
remove();
}
void Exchange::onOpen()
{
Q_D(Exchange);
if(d->deleyedDeclare)
{
d->declare();
}
}
void Exchange::onClose()
{
d_func()->remove(true, true);
}
Exchange::ExchangeOptions Exchange::option() const
{
return d_func()->options;
}
QString Exchange::type() const
{
return d_func()->type;
}
void Exchange::declare(const QString &type, ExchangeOptions option , const TableField & arg)
{
Q_D(Exchange);
d->options = option;
d->type = type;
d->arguments = arg;
d->declare();
}
void Exchange::remove( bool ifUnused /*= true*/, bool noWait /*= true*/ )
{
d_func()->remove(ifUnused, noWait);
}
void Exchange::bind( QAMQP::Queue * queue )
{
queue->bind(this, d_func()->name);
}
void Exchange::bind( const QString & queueName )
{
}
void Exchange::bind( const QString & queueName, const QString &key )
{
}
void Exchange::publish( const QString & message, const QString & key )
{
d_func()->publish(message.toUtf8(), key);
}
//////////////////////////////////////////////////////////////////////////
ExchangePrivate::ExchangePrivate()
:ChannelPrivate()
, deleyedDeclare(false)
, declared(false)
{
}
ExchangePrivate::~ExchangePrivate()
{
}
void ExchangePrivate::_q_method( const QAMQP::Frame::Method & frame )
{
ChannelPrivate::_q_method(frame);
if(frame.methodClass() != QAMQP::Frame::fcExchange
|| frame.channel() != number )
return;
switch(frame.id())
{
case miDeclareOk:
declareOk(frame);
break;
case miDelete:
deleteOk(frame);
break;
default:
break;
}
}
void ExchangePrivate::declareOk( const QAMQP::Frame::Method & frame )
{
qDebug() << "Declared exchange: " << name;
QMetaObject::invokeMethod(q_func(), "declared");
declared = true;
}
void ExchangePrivate::deleteOk( const QAMQP::Frame::Method & frame )
{
qDebug() << "Deleted exchange: " << name;
QMetaObject::invokeMethod(q_func(), "removed");
declared = false;
}
void ExchangePrivate::declare( )
{
if(!opened)
{
deleyedDeclare = true;
return;
}
if(name.isEmpty())
return;
QAMQP::Frame::Method frame(QAMQP::Frame::fcExchange, miDeclare);
frame.setChannel(number);
QByteArray arguments_;
QDataStream stream(&arguments_, QIODevice::WriteOnly);
stream << qint16(0); //reserver 1
writeField('s', stream, name);
writeField('s', stream, type);
stream << qint8(options);
writeField('F', stream, ExchangePrivate::arguments);
frame.setArguments(arguments_);
sendFrame(frame);
deleyedDeclare = false;
}
void ExchangePrivate::remove( bool ifUnused /*= true*/, bool noWait /*= true*/ )
{
QAMQP::Frame::Method frame(QAMQP::Frame::fcExchange, miDelete);
frame.setChannel(number);
QByteArray arguments_;
QDataStream stream(&arguments_, QIODevice::WriteOnly);
stream << qint8(0); //reserver 1
writeField('s', stream, name);
qint8 flag = 0;
flag |= (ifUnused ? 0x1 : 0);
flag |= (noWait ? 0x2 : 0);
stream << flag; //reserver 1
frame.setArguments(arguments_);
sendFrame(frame);
}
void ExchangePrivate::publish( const QByteArray & message, const QString & key, const QString &mimeType /*= QString::fromLatin1("text/plain")*/ )
{
QAMQP::Frame::Method frame(QAMQP::Frame::fcBasic, bmPublish);
frame.setChannel(number);
QByteArray arguments_;
QDataStream out(&arguments_, QIODevice::WriteOnly);
out << qint16(0); //reserver 1
writeField('s', out, name);
writeField('s', out, key);
out << qint8(0);
frame.setArguments(arguments_);
sendFrame(frame);
QAMQP::Frame::Content content(QAMQP::Frame::fcBasic);
content.setChannel(number);
content.setProperty(Content::cpContentType, "text/plain");
content.setProperty(Content::cpContentEncoding, "utf-8");
content.setProperty(Content::cpMessageId, "0");
content.setBody(message);
sendFrame(content);
QAMQP::Frame::ContentBody body;
body.setChannel(number);
body.setBody(message);
sendFrame(body);
}

61
src/qamqp/amqp_exchange.h Normal file
View File

@ -0,0 +1,61 @@
#ifndef amqp_exchange_h__
#define amqp_exchange_h__
#include "amqp_channel.h"
namespace QAMQP
{
class Client;
class Queue;
class ClientPrivate;
class ExchangePrivate;
using namespace QAMQP::Frame;
class Exchange : public Channel
{
Q_OBJECT;
Exchange(int channelNumber = -1, Client * parent = 0);
Q_PROPERTY(QString type READ type);
Q_PROPERTY(ExchangeOptions option READ option );
Q_DECLARE_PRIVATE(QAMQP::Exchange)
Q_DISABLE_COPY(Exchange);
friend class ClientPrivate;
protected:
void onOpen();
void onClose();
public:
enum ExchangeOption {
NoOptions = 0x0,
Passive = 0x01,
Durable = 0x02,
AutoDelete = 0x4,
Internal = 0x8,
NoWait = 0x10
};
Q_DECLARE_FLAGS(ExchangeOptions, ExchangeOption)
~Exchange();
QString type() const;
ExchangeOptions option() const;
void declare(const QString &type = QString::fromLatin1("direct"), ExchangeOptions option = NoOptions, const TableField & arg = TableField());
void remove(bool ifUnused = true, bool noWait = true);
void bind(QAMQP::Queue * queue);
void bind(const QString & queueName);
void bind(const QString & queueName, const QString &key);
void publish(const QString & message, const QString & key);
void publish(const QByteArray & message, const QString & key, const QString &mimeType);
Q_SIGNALS:
void declared();
void removed();
};
}
Q_DECLARE_OPERATORS_FOR_FLAGS(QAMQP::Exchange::ExchangeOptions)
#endif // amqp_exchange_h__

View File

@ -0,0 +1,39 @@
#include "amqp_channel_p.h"
#define METHOD_ID_ENUM(name, id) name = id, name ## Ok
namespace QAMQP
{
using namespace QAMQP::Frame;
class ExchangePrivate: public ChannelPrivate
{
Q_DECLARE_PUBLIC(QAMQP::Exchange)
public:
enum MethodId
{
METHOD_ID_ENUM(miDeclare, 10),
METHOD_ID_ENUM(miDelete, 20)
};
ExchangePrivate();
~ExchangePrivate();
void declare();
void remove(bool ifUnused = true, bool noWait = true);
void declareOk(const QAMQP::Frame::Method & frame);
void deleteOk(const QAMQP::Frame::Method & frame);
void publish(const QByteArray & message, const QString & key, const QString &mimeType = QString::fromLatin1("text/plain"));
QString type;
Exchange::ExchangeOptions options;
TableField arguments;
void _q_method(const QAMQP::Frame::Method & frame);
bool deleyedDeclare;
bool declared;
};
}

View File

@ -148,7 +148,7 @@ void QAMQP::Frame::Method::writePayload( QDataStream & stream ) const
//////////////////////////////////////////////////////////////////////////
QVariant QAMQP::Frame::readField( FieldValueKind valueType, QDataStream &s )
QVariant QAMQP::Frame::readField( qint8 valueType, QDataStream &s )
{
QVariant value;
QByteArray tmp;
@ -232,7 +232,7 @@ QVariant QAMQP::Frame::readField( FieldValueKind valueType, QDataStream &s )
for (int i =0; i < length_; ++i)
{
s >> type;
array_ << readField(FieldValueKind(type), s);
array_ << readField(type, s);
}
value = array_;
}
@ -266,9 +266,9 @@ QDataStream & QAMQP::Frame::deserialize( QDataStream & stream, QAMQP::Frame::Tab
{
qint8 valueType = 0;
QString name = readField(fkShortString, s).toString();
QString name = readField('s', s).toString();
s >> valueType;
f[name] = readField(FieldValueKind(valueType), s);
f[name] = readField(valueType, s);
}
return stream;
@ -281,10 +281,16 @@ QDataStream & QAMQP::Frame::serialize( QDataStream & stream, const TableField &
TableField::ConstIterator i;
for(i = f.begin(); i != f.end(); ++i)
{
writeField(fkShortString, s, i.key());
writeField('s', s, i.key());
writeField(s, i.value());
}
stream << data;
if(data.isEmpty())
{
stream << qint32(0);
} else {
stream << data;
}
return stream;
}
@ -307,7 +313,7 @@ void QAMQP::Frame::print( const TableField & f )
}
}
void QAMQP::Frame::writeField( FieldValueKind valueType, QDataStream &s, const QVariant & value, bool withType )
void QAMQP::Frame::writeField( qint8 valueType, QDataStream &s, const QVariant & value, bool withType )
{
QByteArray tmp;
qint8 nameSize_;
@ -457,5 +463,116 @@ void QAMQP::Frame::writeField( QDataStream &s, const QVariant & value )
}
if(type)
writeField(FieldValueKind(type), s, value, true);
writeField(type, s, value, true);
}
//////////////////////////////////////////////////////////////////////////
QAMQP::Frame::Content::Content():Base(ftHeader)
{
}
QAMQP::Frame::Content::Content( MethodClass methodClass ):Base(ftHeader)
{
methodClass_ = methodClass;
}
QAMQP::Frame::Content::Content( QDataStream& raw ):Base(ftHeader)
{
}
QAMQP::Frame::MethodClass QAMQP::Frame::Content::methodClass() const
{
return MethodClass(methodClass_);
}
qint32 QAMQP::Frame::Content::size() const
{
QDataStream out(&buffer_, QIODevice::WriteOnly);
buffer_.clear();
out << qint16(methodClass_);
out << qint16(0); //weight
out << qlonglong(body_.size());
qint16 prop_ = 0;
foreach (int p, properties_.keys())
{
prop_ |= p;
}
out << prop_;
QHash<int, QVariant>::const_iterator i;
for(i = properties_.begin(); i != properties_.end(); ++i)
{
if(i.value().type() == QVariant::String)
{
writeField('s', out, i.value());
} else {
writeField(out, i.value());
}
}
return buffer_.size();
}
void QAMQP::Frame::Content::setBody( const QByteArray & data )
{
body_ = data;
}
QByteArray QAMQP::Frame::Content::body() const
{
return body_;
}
void QAMQP::Frame::Content::setProperty( Property prop, const QVariant & value )
{
properties_[prop] = value;
}
QVariant QAMQP::Frame::Content::property( Property prop ) const
{
return properties_.value(prop);
}
void QAMQP::Frame::Content::writePayload( QDataStream & out ) const
{
out.writeRawData(buffer_.data(), buffer_.size());
}
void QAMQP::Frame::Content::readPayload( QDataStream & in )
{
}
//////////////////////////////////////////////////////////////////////////
ContentBody::ContentBody() : Base(ftBody)
{}
void QAMQP::Frame::ContentBody::setBody( const QByteArray & data )
{
body_ = data;
}
QByteArray QAMQP::Frame::ContentBody::body() const
{
return body_;
}
void QAMQP::Frame::ContentBody::writePayload( QDataStream & out ) const
{
out.writeRawData(body_.data(), body_.size());
}
void QAMQP::Frame::ContentBody::readPayload( QDataStream & in )
{
}
qint32 QAMQP::Frame::ContentBody::size() const
{
return body_.size();
}

171
src/qamqp/amqp_frame.h Normal file
View File

@ -0,0 +1,171 @@
#ifndef amqp_frame_h__
#define amqp_frame_h__
#include <QDataStream>
#include <QHash>
#include <QVariant>
#define AMQP_BASIC_CONTENT_TYPE_FLAG (1 << 15)
#define AMQP_BASIC_CONTENT_ENCODING_FLAG (1 << 7)
#define AMQP_BASIC_HEADERS_FLAG (1 << 13)
#define AMQP_BASIC_DELIVERY_MODE_FLAG (1 << 12)
#define AMQP_BASIC_PRIORITY_FLAG (1 << 11)
#define AMQP_BASIC_CORRELATION_ID_FLAG (1 << 10)
#define AMQP_BASIC_REPLY_TO_FLAG (1 << 9)
#define AMQP_BASIC_EXPIRATION_FLAG (1 << 8)
#define AMQP_BASIC_MESSAGE_ID_FLAG (1 << 14)
#define AMQP_BASIC_TIMESTAMP_FLAG (1 << 6)
#define AMQP_BASIC_TYPE_FLAG (1 << 5)
#define AMQP_BASIC_USER_ID_FLAG (1 << 4)
#define AMQP_BASIC_APP_ID_FLAG (1 << 3)
#define AMQP_BASIC_CLUSTER_ID_FLAG (1 << 2)
namespace QAMQP
{
namespace Frame
{
enum Type
{
ftMethod = 1,
ftHeader = 2,
ftBody = 3,
ftHeartbeat = 8
};
enum MethodClass
{
fcConnection = 10,
fcChannel = 20,
fcExchange = 40,
fcQueue = 50,
fcBasic = 60,
fcTx = 90,
};
struct decimal
{
qint8 scale;
quint32 value;
};
Q_DECLARE_METATYPE(QAMQP::Frame::decimal);
typedef QHash<QString, QVariant> TableField;
Q_DECLARE_METATYPE(QAMQP::Frame::TableField);
QDataStream & serialize( QDataStream & stream, const QAMQP::Frame::TableField & f );
QDataStream & deserialize( QDataStream & stream, QAMQP::Frame::TableField & f );
QVariant readField( qint8 valueType, QDataStream &s );
void writeField( QDataStream &s, const QVariant & value );
void writeField( qint8 valueType, QDataStream &s, const QVariant & value, bool withType = false );
void print( const QAMQP::Frame::TableField & f );
class Base
{
public:
Base(Type type);
Base(QDataStream& raw);
Type type() const;
void setChannel(qint16 channel);
qint16 channel() const;
virtual qint32 size() const;
void toStream(QDataStream & stream) const;
protected:
void writeHeader(QDataStream & stream) const;
virtual void writePayload(QDataStream & stream) const;
void writeEnd(QDataStream & stream) const;
void readHeader(QDataStream & stream);
virtual void readPayload(QDataStream & stream);
void readEnd(QDataStream & stream);
qint32 size_;
private:
qint8 type_;
qint16 channel_;
};
class Method : public Base
{
public:
Method();
Method(MethodClass methodClass, qint16 id);
Method(QDataStream& raw);
MethodClass methodClass() const;
qint16 id() const;
qint32 size() const;
void setArguments(const QByteArray & data);
QByteArray arguments() const;
protected:
void writePayload(QDataStream & stream) const;
void readPayload(QDataStream & stream);
short methodClass_;
qint16 id_;
QByteArray arguments_;
};
class Content : public Base
{
public:
enum Property
{
cpContentType = AMQP_BASIC_CONTENT_TYPE_FLAG,
cpContentEncoding = AMQP_BASIC_CONTENT_ENCODING_FLAG,
cpHeaders = AMQP_BASIC_HEADERS_FLAG,
cpDeliveryMode = AMQP_BASIC_DELIVERY_MODE_FLAG,
cpPriority = AMQP_BASIC_PRIORITY_FLAG,
cpCorrelationId = AMQP_BASIC_CORRELATION_ID_FLAG,
cpReplyTo = AMQP_BASIC_REPLY_TO_FLAG,
cpExpiration = AMQP_BASIC_EXPIRATION_FLAG,
cpMessageId = AMQP_BASIC_MESSAGE_ID_FLAG,
cpTimestamp = AMQP_BASIC_TIMESTAMP_FLAG,
cpType = AMQP_BASIC_TYPE_FLAG,
cpUserId = AMQP_BASIC_USER_ID_FLAG,
cpAppId = AMQP_BASIC_APP_ID_FLAG,
cpClusterID = AMQP_BASIC_CLUSTER_ID_FLAG
};
Content();
Content(MethodClass methodClass);
Content(QDataStream& raw);
MethodClass methodClass() const;
qint32 size() const;
void setProperty(Property prop, const QVariant & value);
QVariant property(Property prop) const;
void setBody(const QByteArray & data);
QByteArray body() const;
protected:
void writePayload(QDataStream & stream) const;
void readPayload(QDataStream & stream);
short methodClass_;
qint16 id_;
QByteArray body_;
mutable QByteArray buffer_;
QHash<int, QVariant> properties_;
};
class ContentBody : public Base
{
public:
ContentBody();
void setBody(const QByteArray & data);
QByteArray body() const;
qint32 size() const;
protected:
void writePayload(QDataStream & stream) const;
void readPayload(QDataStream & stream);
private:
QByteArray body_;
};
}
}
#endif // amqp_frame_h__

View File

@ -31,7 +31,8 @@ void QAMQP::Network::connectTo( const QString & host, quint32 port )
void QAMQP::Network::disconnect()
{
if(socket_)
socket_->abort();
}
void QAMQP::Network::connected()
@ -101,8 +102,8 @@ void QAMQP::Network::readyRead()
}
}
void QAMQP::Network::sendFrame( const QAMQP::Frame::BasePtr &frame )
void QAMQP::Network::sendFrame( const QAMQP::Frame::Base & frame )
{
QDataStream stream(socket_);
frame->toStream(stream);
frame.toStream(stream);
}

View File

@ -5,7 +5,6 @@
#include <QTcpSocket>
#include <QPointer>
#include <QBuffer>
#include <QQueue>
#include "amqp_frame.h"
@ -23,7 +22,7 @@ namespace QAMQP
void disconnect();
void sendFrame();
void sendFrame(const QAMQP::Frame::BasePtr &frame);
void sendFrame(const QAMQP::Frame::Base & frame);
signals:
void method(const QAMQP::Frame::Method & method);
@ -37,7 +36,6 @@ namespace QAMQP
private:
QPointer<QTcpSocket> socket_;
QPointer<QBuffer> buffer_;
QQueue<QAMQP::Frame::BasePtr> outFrames_;
int offsetBuf;
int leftSize;
qint8 lastType_;

View File

@ -19,12 +19,13 @@ namespace QAMQP
void init(QObject * parent, const QUrl & connectionString);
void printConnect() const;
void connect();
void disconnect();
void parseCnnString( const QUrl & connectionString);
void sockConnect();
void login();
Exchange * createExchange(const QString &name);
Queue * createQueue(const QString &name);
Exchange * createExchange(int channelNumber, const QString &name);
Queue * createQueue(int channelNumber, const QString &name);
quint32 port;
QString host;

346
src/qamqp/amqp_queue.cpp Normal file
View File

@ -0,0 +1,346 @@
#include "amqp_queue.h"
#include "amqp_queue_p.h"
#include "amqp_exchange.h"
using namespace QAMQP;
using namespace QAMQP::Frame;
#include <QCoreApplication>
#include <QDebug>
#include <QDataStream>
namespace QAMQP
{
struct QueueExceptionCleaner
{
/* this cleans up when the constructor throws an exception */
static inline void cleanup(Queue *that, QueuePrivate *d)
{
#ifdef QT_NO_EXCEPTIONS
Q_UNUSED(that);
Q_UNUSED(d);
#else
Q_UNUSED(that);
Q_UNUSED(d);
#endif
}
};
}
Queue::Queue( int channelNumber, Client * parent /*= 0*/ )
: Channel(*new QueuePrivate, 0)
{
QT_TRY {
d_func()->init(channelNumber, parent);
} QT_CATCH(...) {
QueueExceptionCleaner::cleanup(this, d_func());
QT_RETHROW;
}
}
Queue::~Queue()
{
remove();
}
void Queue::onOpen()
{
Q_D(Queue);
if(d->deleyedDeclare)
{
d->declare();
}
if(!d->delayedBindings.isEmpty())
{
QMap<QString, QString>::iterator i;
for(i = d->delayedBindings.begin(); i!= d->delayedBindings.end(); ++i )
{
d->bind(i.value(), i.key());
}
d->delayedBindings.clear();
}
}
void Queue::onClose()
{
d_func()->remove(true, true);
}
Queue::QueueOptions Queue::option() const
{
return d_func()->options;
}
void Queue::declare()
{
Q_D(Queue);
declare(d->name, QueueOptions(Durable | AutoDelete));
}
void Queue::declare( const QString &name, QueueOptions options )
{
Q_D(Queue);
setName(name);
d->options = options;
d->declare();
}
void Queue::remove( bool ifUnused /*= true*/, bool ifEmpty /*= true*/, bool noWait /*= true*/ )
{
d_func()->remove(ifUnused, ifEmpty, noWait);
}
void Queue::purge()
{
d_func()->purge();
}
void Queue::bind( const QString & exchangeName, const QString & key )
{
d_func()->bind(exchangeName, key);
}
void Queue::bind( Exchange * exchange, const QString & key )
{
if(exchange)
d_func()->bind(exchange->name(), key);
}
void Queue::unbind( const QString & exchangeName, const QString & key )
{
d_func()->unbind(exchangeName, key);
}
void Queue::unbind( Exchange * exchange, const QString & key )
{
if(exchange)
d_func()->unbind(exchange->name(), key);
}
void Queue::get()
{
}
void Queue::consume()
{
}
void Queue::setConsumerTag( const QString &consumerTag )
{
d_func()->setConsumerTag(consumerTag);
}
QString Queue::consumerTag() const
{
return d_func()->consumerTag;
}
//////////////////////////////////////////////////////////////////////////
QueuePrivate::QueuePrivate()
:ChannelPrivate()
, deleyedDeclare(false)
, declared(false)
{
}
QueuePrivate::~QueuePrivate()
{
}
void QueuePrivate::_q_method( const QAMQP::Frame::Method & frame )
{
ChannelPrivate::_q_method(frame);
if(frame.methodClass() != QAMQP::Frame::fcQueue
|| frame.channel() != number )
return;
switch(frame.id())
{
case miDeclareOk:
declareOk(frame);
break;
case miDelete:
deleteOk(frame);
break;
case miBindOk:
bindOk(frame);
break;
case miUnbindOk:
unbindOk(frame);
break;
case miPurgeOk:
deleteOk(frame);
break;
default:
break;
}
}
void QueuePrivate::declareOk( const QAMQP::Frame::Method & frame )
{
qDebug() << "Declared queue: " << name;
QMetaObject::invokeMethod(q_func(), "declared");
declared = true;
QByteArray data = frame.arguments();
QDataStream stream(&data, QIODevice::ReadOnly);
name = readField('s', stream).toString();
qint32 messageCount = 0, consumerCount = 0;
stream >> messageCount >> consumerCount;
qDebug("Message count %d\nConsumer count: %d", messageCount, consumerCount);
}
void QueuePrivate::deleteOk( const QAMQP::Frame::Method & frame )
{
qDebug() << "Deleted or purged queue: " << name;
QMetaObject::invokeMethod(q_func(), "removed");
declared = false;
QByteArray data = frame.arguments();
QDataStream stream(&data, QIODevice::ReadOnly);
qint32 messageCount = 0;
stream >> messageCount;
qDebug("Message count %d", messageCount);
}
void QueuePrivate::bindOk( const QAMQP::Frame::Method & frame )
{
qDebug() << "Binded to queue: " << name;
QMetaObject::invokeMethod(q_func(), "binded", Q_ARG(bool, true));
}
void QueuePrivate::unbindOk( const QAMQP::Frame::Method & frame )
{
qDebug() << "Unbinded queue: " << name;
QMetaObject::invokeMethod(q_func(), "binded", Q_ARG(bool, false));
}
void QueuePrivate::declare()
{
if(!opened)
{
deleyedDeclare = true;
return;
}
QAMQP::Frame::Method frame(QAMQP::Frame::fcQueue, miDeclare);
frame.setChannel(number);
QByteArray arguments_;
QDataStream out(&arguments_, QIODevice::WriteOnly);
out << qint16(0); //reserver 1
writeField('s', out, name);
out << qint8(options);
writeField('F', out, TableField());
frame.setArguments(arguments_);
sendFrame(frame);
deleyedDeclare = false;
}
void QueuePrivate::remove( bool ifUnused /*= true*/, bool ifEmpty /*= true*/, bool noWait /*= true*/ )
{
if(!declared)
return;
QAMQP::Frame::Method frame(QAMQP::Frame::fcExchange, miDelete);
frame.setChannel(number);
QByteArray arguments_;
QDataStream out(&arguments_, QIODevice::WriteOnly);
out << qint8(0); //reserver 1
writeField('s', out, name);
qint8 flag = 0;
flag |= (ifUnused ? 0x1 : 0);
flag |= (ifEmpty ? 0x2 : 0);
flag |= (noWait ? 0x4 : 0);
out << flag;
frame.setArguments(arguments_);
sendFrame(frame);
}
void QueuePrivate::purge()
{
if(!opened)
{
return;
}
QAMQP::Frame::Method frame(QAMQP::Frame::fcQueue, miBind);
frame.setChannel(number);
QByteArray arguments_;
QDataStream out(&arguments_, QIODevice::WriteOnly);
out << qint16(0); //reserver 1
writeField('s', out, name);
out << qint8(1); // no-wait
frame.setArguments(arguments_);
sendFrame(frame);
}
void QueuePrivate::bind( const QString & exchangeName, const QString & key )
{
if(!opened)
{
delayedBindings[exchangeName] = key;
return;
}
QAMQP::Frame::Method frame(QAMQP::Frame::fcQueue, miBind);
frame.setChannel(number);
QByteArray arguments_;
QDataStream out(&arguments_, QIODevice::WriteOnly);
out << qint16(0); //reserver 1
writeField('s', out, name);
writeField('s', out, exchangeName);
writeField('s', out, key);
out << qint8(0); // no-wait
writeField('F', out, TableField());
frame.setArguments(arguments_);
sendFrame(frame);
}
void QueuePrivate::unbind( const QString & exchangeName, const QString & key )
{
if(!opened)
{
return;
}
QAMQP::Frame::Method frame(QAMQP::Frame::fcQueue, miBind);
frame.setChannel(number);
QByteArray arguments_;
QDataStream out(&arguments_, QIODevice::WriteOnly);
out << qint16(0); //reserver 1
writeField('s', out, name);
writeField('s', out, exchangeName);
writeField('s', out, key);
writeField('F', out, TableField());
frame.setArguments(arguments_);
sendFrame(frame);
}
void QueuePrivate::setConsumerTag( const QString &consumerTag )
{
}

65
src/qamqp/amqp_queue.h Normal file
View File

@ -0,0 +1,65 @@
#ifndef amqp_queue_h__
#define amqp_queue_h__
#include "amqp_channel.h"
namespace QAMQP
{
class Client;
class ClientPrivate;
class Exchange;
class QueuePrivate;
class Queue : public Channel
{
Q_OBJECT
Queue(int channelNumber = -1, Client * parent = 0);
Q_PROPERTY(QueueOptions option READ option );
Q_PROPERTY(QString consumerTag READ consumerTag WRITE setConsumerTag)
Q_DECLARE_PRIVATE(QAMQP::Queue)
Q_DISABLE_COPY(Queue);
friend class ClientPrivate;
protected:
void onOpen();
void onClose();
public:
enum QueueOption {
NoOptions = 0x0,
Passive = 0x01,
Durable = 0x02,
Exclusive = 0x4,
AutoDelete = 0x8,
NoWait = 0x10
};
Q_DECLARE_FLAGS(QueueOptions, QueueOption)
~Queue();
QueueOptions option() const;
void declare();
void declare(const QString &name, QueueOptions options);
void remove(bool ifUnused = true, bool ifEmpty = true, bool noWait = true);
void purge();
void bind(const QString & exchangeName, const QString & key);
void bind(Exchange * exchange, const QString & key);
void unbind(const QString & exchangeName, const QString & key);
void unbind(Exchange * exchange, const QString & key);
void get();
void consume();
void setConsumerTag(const QString &consumerTag);
QString consumerTag() const;
Q_SIGNALS:
void declared();
void binded(bool);
void removed();
};
}
#endif // amqp_queue_h__

51
src/qamqp/amqp_queue_p.h Normal file
View File

@ -0,0 +1,51 @@
#include "amqp_channel_p.h"
#define METHOD_ID_ENUM(name, id) name = id, name ## Ok
namespace QAMQP
{
using namespace QAMQP::Frame;
class QueuePrivate: public ChannelPrivate
{
Q_DECLARE_PUBLIC(QAMQP::Queue)
public:
enum MethodId
{
METHOD_ID_ENUM(miDeclare, 10),
METHOD_ID_ENUM(miBind, 20),
METHOD_ID_ENUM(miUnbind, 50),
METHOD_ID_ENUM(miPurge, 30),
METHOD_ID_ENUM(miDelete, 40)
};
QueuePrivate();
~QueuePrivate();
void declare();
void remove(bool ifUnused = true, bool ifEmpty = true, bool noWait = true);
void purge();
void bind(const QString & exchangeName, const QString & key);
void unbind(const QString & exchangeName, const QString & key);
void setConsumerTag( const QString &consumerTag );
void declareOk(const QAMQP::Frame::Method & frame);
void deleteOk(const QAMQP::Frame::Method & frame);
void bindOk(const QAMQP::Frame::Method & frame);
void unbindOk(const QAMQP::Frame::Method & frame);
QString type;
Queue::QueueOptions options;
void _q_method(const QAMQP::Frame::Method & frame);
bool deleyedDeclare;
bool declared;
QString consumerTag;
QMap<QString, QString> delayedBindings;
};
}

28
src/test.cpp Normal file
View File

@ -0,0 +1,28 @@
#include "test.h"
Test::Test()
{
QUrl con(QString("amqp://guest:guest@localhost:5672/"));
client_ = new QAMQP::Client(this);
client_->open(con);
exchange_ = client_->createExchange("test.test");
exchange_->declare("direct");
queue_ = client_->createQueue("test.my_queue", exchange_->channelNumber());
queue_->declare();
exchange_->bind(queue_);
connect(queue_, SIGNAL(declared()), this, SLOT(declared()));
}
Test::~Test()
{
}
void Test::declared()
{
qDebug("\t-= Ready =-");
exchange_->publish("test 3432 432 24 23 423 32 23 4324 32 423 423 423", exchange_->name());
}

22
src/test.h Normal file
View File

@ -0,0 +1,22 @@
#include <QObject>
#include "qamqp/amqp.h"
#include "qamqp/amqp_exchange.h"
#include "qamqp/amqp_queue.h"
#include <QPointer>
class Test : public QObject
{
Q_OBJECT
public:
Test();
~Test();
private slots:
void declared();
private:
QPointer<QAMQP::Client> client_;
QPointer<QAMQP::Exchange> exchange_;
QPointer<QAMQP::Queue> queue_;
};