From 5ca89c228b33ade564218260e48dcde5a668027b Mon Sep 17 00:00:00 2001 From: fuCtor Date: Sun, 29 Jan 2012 07:36:03 -0800 Subject: [PATCH] First commit Implement: * Connection * Channel --- QAMQP.sln | 20 ++ QAMQP.vcproj | 543 ++++++++++++++++++++++++++++++++++++++++ src/QAMQP.h | 233 +++++++++++++++++ src/amqp.cpp | 245 ++++++++++++++++++ src/amqp.h | 66 +++++ src/amqp_channel.cpp | 255 +++++++++++++++++++ src/amqp_channel.h | 53 ++++ src/amqp_channel_p.h | 62 +++++ src/amqp_connection.cpp | 327 ++++++++++++++++++++++++ src/amqp_connection.h | 45 ++++ src/amqp_connection_p.h | 46 ++++ src/amqp_exchange.h | 18 ++ src/amqp_frame.cpp | 461 ++++++++++++++++++++++++++++++++++ src/amqp_frame.h | 97 +++++++ src/amqp_message.h | 0 src/amqp_network.cpp | 108 ++++++++ src/amqp_network.h | 44 ++++ src/amqp_p.h | 38 +++ src/amqp_private.h | 27 ++ src/amqp_queue.h | 0 src/main.cpp | 18 ++ src/qamqp_global.h | 18 ++ 22 files changed, 2724 insertions(+) create mode 100644 QAMQP.sln create mode 100644 QAMQP.vcproj create mode 100644 src/QAMQP.h create mode 100644 src/amqp.cpp create mode 100644 src/amqp.h create mode 100644 src/amqp_channel.cpp create mode 100644 src/amqp_channel.h create mode 100644 src/amqp_channel_p.h create mode 100644 src/amqp_connection.cpp create mode 100644 src/amqp_connection.h create mode 100644 src/amqp_connection_p.h create mode 100644 src/amqp_exchange.h create mode 100644 src/amqp_frame.cpp create mode 100644 src/amqp_frame.h create mode 100644 src/amqp_message.h create mode 100644 src/amqp_network.cpp create mode 100644 src/amqp_network.h create mode 100644 src/amqp_p.h create mode 100644 src/amqp_private.h create mode 100644 src/amqp_queue.h create mode 100644 src/main.cpp create mode 100644 src/qamqp_global.h diff --git a/QAMQP.sln b/QAMQP.sln new file mode 100644 index 0000000..30e4eaf --- /dev/null +++ b/QAMQP.sln @@ -0,0 +1,20 @@ +п»ї +Microsoft Visual Studio Solution File, Format Version 10.00 +# Visual Studio 2008 +Project("{8BC9CEB8-8B4A-11D0-8D11-00A0C91BC942}") = "QAMQP", "QAMQP.vcproj", "{479207E5-BC66-4954-A7E9-AE8DA7F2D7B9}" +EndProject +Global + GlobalSection(SolutionConfigurationPlatforms) = preSolution + Debug|Win32 = Debug|Win32 + Release|Win32 = Release|Win32 + EndGlobalSection + GlobalSection(ProjectConfigurationPlatforms) = postSolution + {479207E5-BC66-4954-A7E9-AE8DA7F2D7B9}.Debug|Win32.ActiveCfg = Debug|Win32 + {479207E5-BC66-4954-A7E9-AE8DA7F2D7B9}.Debug|Win32.Build.0 = Debug|Win32 + {479207E5-BC66-4954-A7E9-AE8DA7F2D7B9}.Release|Win32.ActiveCfg = Release|Win32 + {479207E5-BC66-4954-A7E9-AE8DA7F2D7B9}.Release|Win32.Build.0 = Release|Win32 + EndGlobalSection + GlobalSection(SolutionProperties) = preSolution + HideSolutionNode = FALSE + EndGlobalSection +EndGlobal diff --git a/QAMQP.vcproj b/QAMQP.vcproj new file mode 100644 index 0000000..1c81dfa --- /dev/null +++ b/QAMQP.vcproj @@ -0,0 +1,543 @@ + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + diff --git a/src/QAMQP.h b/src/QAMQP.h new file mode 100644 index 0000000..60b8069 --- /dev/null +++ b/src/QAMQP.h @@ -0,0 +1,233 @@ +/* +* QAMQP.h +* libqamqp +* +* Created by Alexey Shcherbakov on 28.01.2012. +* +*/ +#ifndef QAMQP_h__ +#define QAMQP_h__ + +#define AMQPDEBUG ":5673" + +#define AMQP_AUTODELETE 1 +#define AMQP_DURABLE 2 +#define AMQP_PASSIVE 4 +#define AMQP_MANDATORY 8 +#define AMQP_IMMIDIATE 16 +#define AMQP_IFUNUSED 32 +#define AMQP_EXCLUSIVE 64 +#define AMQP_NOWAIT 128 +#define AMQP_NOACK 256 +#define AMQP_NOLOCAL 512 +#define AMQP_MULTIPLE 1024 + + +#define HEADER_FOOTER_SIZE 8 // 7 bytes up front, then payload, then 1 byte footer + // max lenght (size) of frame + +#include +#include +#include + + +//export AMQP; +namespace QAMQP +{ +class AMQPQueue; + +enum AMQPEvents_e { + AMQP_MESSAGE, AMQP_SIGUSR, AMQP_CANCEL, AMQP_CLOSE_CHANNEL +}; + +class AMQPException { + string message; + int code; +public: + AMQPException(string message); + AMQPException(amqp_rpc_reply_t * res); + + string getMessage(); + uint16_t getReplyCode(); +}; + + + +class AMQPMessage { + + char * data; + string exchange; + string routing_key; + uint32_t delivery_tag; + int message_count; + string consumer_tag; + AMQPQueue * queue; + map headers; + +public : + AMQPMessage(AMQPQueue * queue); + ~AMQPMessage(); + + void setMessage(const char * data); + char * getMessage(); + + void addHeader(string name, amqp_bytes_t * value); + void addHeader(string name, uint64_t * value); + void addHeader(string name, uint8_t * value); + string getHeader(string name); + + void setConsumerTag( amqp_bytes_t consumer_tag); + void setConsumerTag( string consumer_tag); + string getConsumerTag(); + + void setMessageCount(int count); + int getMessageCount(); + + void setExchange(amqp_bytes_t exchange); + void setExchange(string exchange); + string getExchange(); + + void setRoutingKey(amqp_bytes_t routing_key); + void setRoutingKey(string routing_key); + string getRoutingKey(); + + uint32_t getDeliveryTag(); + void setDeliveryTag(uint32_t delivery_tag); + + AMQPQueue * getQueue(); + +}; + + +class AMQPBase { +protected: + string name; + short parms; + amqp_connection_state_t * cnn; + int channelNum; + AMQPMessage * pmessage; + + short opened; + + void checkReply(amqp_rpc_reply_t * res); + void checkClosed(amqp_rpc_reply_t * res); + void openChannel(); + + +public: + ~AMQPBase(); + int getChannelNum(); + void setParam(short param); + string getName(); + void closeChannel(); + void reopen(); + void setName(const char * name); + void setName(string name); +}; + +class AMQPQueue : public AMQPBase { +protected: + map< AMQPEvents_e, int(*)( AMQPMessage * ) > events; + amqp_bytes_t consumer_tag; + uint32_t delivery_tag; + uint32_t count; +public: + AMQPQueue(amqp_connection_state_t * cnn, int channelNum); + AMQPQueue(amqp_connection_state_t * cnn, int channelNum, string name); + + void Declare(); + void Declare(string name); + void Declare(string name, short parms); + + void Delete(); + void Delete(string name); + + void Purge(); + void Purge(string name); + + void Bind(string exchangeName, string key); + + void unBind(string exchangeName, string key); + + void Get(); + void Get(short param); + + void Consume(); + void Consume(short param); + + void Cancel(amqp_bytes_t consumer_tag); + void Cancel(string consumer_tag); + + void Ack(); + void Ack(uint32_t delivery_tag); + + AMQPMessage * getMessage() { + return pmessage; + } + + uint32_t getCount() { + return count; + } + + void setConsumerTag(string consumer_tag); + amqp_bytes_t getConsumerTag(); + + void addEvent( AMQPEvents_e eventType, int (*event)(AMQPMessage*) ); + + ~AMQPQueue(); + +private: + void sendDeclareCommand(); + void sendDeleteCommand(); + void sendPurgeCommand(); + void sendBindCommand(const char * exchange, const char * key); + void sendUnBindCommand(const char * exchange, const char * key); + void sendGetCommand(); + void sendConsumeCommand(); + void sendCancelCommand(); + void sendAckCommand(); + void setHeaders(amqp_basic_properties_t * p); +}; + + +class AMQPExchange : public AMQPBase { + string type; + map sHeaders; + map iHeaders; + +public: + AMQPExchange(amqp_connection_state_t * cnn, int channelNum); + AMQPExchange(amqp_connection_state_t * cnn, int channelNum, string name); + + void Declare(); + void Declare(string name); + void Declare(string name, string type); + void Declare(string name, string type, short parms); + + void Delete(); + void Delete(string name); + + void Bind(string queueName); + void Bind(string queueName, string key); + + void Publish(string message, string key); + + void setHeader(string name, int value); + void setHeader(string name, string value); + +private: + AMQPExchange(); + void checkType(); + void sendDeclareCommand(); + void sendDeleteCommand(); + void sendPublishCommand(); + + void sendBindCommand(const char * queueName, const char * key); + void sendPublishCommand(const char * message, const char * key); + void sendCommand(); + void checkReply(amqp_rpc_reply_t * res); + void checkClosed(amqp_rpc_reply_t * res); + +}; +} +#endif // QAMQP_h__ diff --git a/src/amqp.cpp b/src/amqp.cpp new file mode 100644 index 0000000..5f6bc59 --- /dev/null +++ b/src/amqp.cpp @@ -0,0 +1,245 @@ +#include "amqp.h" +#include "amqp_p.h" + +#include +#include +#include "qamqp_global.h" +#include "amqp_exchange.h" + +using namespace QAMQP; + +struct QAMQP::ClientExceptionCleaner +{ + /* this cleans up when the constructor throws an exception */ + static inline void cleanup(Client *that, ClientPrivate *d) + { +#ifdef QT_NO_EXCEPTIONS + Q_UNUSED(that); + Q_UNUSED(d); +#else + Q_UNUSED(that); + Q_UNUSED(d); +#endif + } +}; + +////////////////////////////////////////////////////////////////////////// + +ClientPrivate::ClientPrivate(int version ) + :QObjectPrivate(version) + , port(AMQPPORT) + , host(QString::fromLatin1(AMQPHOST)) + , virtualHost(QString::fromLatin1(AMQPVHOST)) + , user(QString::fromLatin1(AMQPLOGIN)) + , password(QString::fromLatin1(AMQPPSWD)) +{ + +} + + +ClientPrivate::~ClientPrivate() +{ + +} + +void ClientPrivate::init(QObject * parent) +{ + q_func()->setParent(parent); + network_ = new QAMQP::Network(q_func()); + connection_ = new QAMQP::Connection(q_func()); + + QObject::connect(network_, SIGNAL(method(const QAMQP::Frame::Method &)), + connection_, SLOT(_q_method(const QAMQP::Frame::Method &))); + + ClientPrivate::connect(); +} + +void ClientPrivate::init(QObject * parent, const QUrl & con) +{ + Q_Q(QAMQP::Client); + if(con.scheme() == AMQPSCHEME ) + { + q->setPassword(con.password()); + q->setUser(con.userName()); + q->setPort(con.port()); + q->setHost(con.host()); + q->setVirtualHost(con.path()); + } + init(parent); +} + +void ClientPrivate::printConnect() const +{ + QTextStream stream(stdout); + stream << "port = " << port << endl; + stream << "host = " << host << endl; + stream << "vhost = " << virtualHost << endl; + stream << "user = " << user << endl; + stream << "passw = " << password << endl; +} + +void ClientPrivate::connect() +{ + ClientPrivate::sockConnect(); + ClientPrivate::login(); +} + +void ClientPrivate::parseCnnString( const QUrl & connectionString ) +{ + +} + +void ClientPrivate::sockConnect() +{ + network_->connectTo(host, port); +} + +void ClientPrivate::login() +{ + +} + +Exchange * ClientPrivate::createExchange( const QString &name ) +{ + Exchange * exchange_ = new Exchange(q_func()); + QObject::connect(network_, SIGNAL(method(const QAMQP::Frame::Method &)), + exchange_, SLOT(_q_method(const QAMQP::Frame::Method &))); + + QObject::connect(connection_, SIGNAL(connected()), exchange_, SLOT(_q_open())); + return exchange_; +} + +Queue * ClientPrivate::createQueue( const QString &name ) +{ + return 0; +} + +////////////////////////////////////////////////////////////////////////// + + +QAMQP::Client::Client( QObject * parent /*= 0*/ ) + : QObject(*new ClientPrivate, 0) +{ + QT_TRY { + d_func()->init(parent); + } QT_CATCH(...) { + ClientExceptionCleaner::cleanup(this, d_func()); + QT_RETHROW; + } +} + +QAMQP::Client::Client( const QUrl & connectionString, QObject * parent /*= 0*/ ) + : QObject(*new ClientPrivate, 0) +{ + QT_TRY { + d_func()->init(parent, connectionString); + } QT_CATCH(...) { + ClientExceptionCleaner::cleanup(this, d_func()); + QT_RETHROW; + } +} + +QAMQP::Client::Client(ClientPrivate &dd, QObject* parent, const QUrl & connectionString) + : QObject(dd, 0) +{ + Q_D(QAMQP::Client); + QT_TRY { + d->init(parent, connectionString); + } QT_CATCH(...) { + ClientExceptionCleaner::cleanup(this, d_func()); + QT_RETHROW; + } +} + +QAMQP::Client::~Client() +{ + QObjectPrivate::clearGuards(this); + QT_TRY { + QEvent e(QEvent::Destroy); + QCoreApplication::sendEvent(this, &e); + } QT_CATCH(const std::exception&) { + // if this fails we can't do anything about it but at least we are not allowed to throw. + } +} + +quint32 QAMQP::Client::port() const +{ + return d_func()->port; +} + +void QAMQP::Client::setPort( quint32 port ) +{ + d_func()->port = port; +} + +QString QAMQP::Client::host() const +{ + return d_func()->host; +} + +void QAMQP::Client::setHost( const QString & host ) +{ + d_func()->host = host; +} + +QString QAMQP::Client::virtualHost() const +{ + return d_func()->virtualHost; +} + +void QAMQP::Client::setVirtualHost( const QString & virtualHost ) +{ + d_func()->virtualHost = virtualHost; +} + +QString QAMQP::Client::user() const +{ + return d_func()->user; +} + +void QAMQP::Client::setUser( const QString & user ) +{ + d_func()->user = user; +} + +QString QAMQP::Client::password() const +{ + return d_func()->password; +} + +void QAMQP::Client::setPassword( const QString & password ) +{ + d_func()->password = password; +} + +void QAMQP::Client::printConnect() const +{ +#ifdef _DEBUG + d_func()->printConnect(); +#endif // _DEBUG +} + +void QAMQP::Client::closeChannel() +{ + +} + +Exchange * QAMQP::Client::createExchange() +{ + return d_func()->createExchange(QString()); +} + +Exchange * QAMQP::Client::createExchange( const QString &name ) +{ + return d_func()->createExchange(name); +} + +Queue * QAMQP::Client::createQueue() +{ + return d_func()->createQueue(QString()); +} + +Queue * QAMQP::Client::createQueue( const QString &name ) +{ + return d_func()->createQueue(name); +} \ No newline at end of file diff --git a/src/amqp.h b/src/amqp.h new file mode 100644 index 0000000..9fd30cb --- /dev/null +++ b/src/amqp.h @@ -0,0 +1,66 @@ +#ifndef qamqp_amqp_h__ +#define qamqp_amqp_h__ + +#include +#include + +namespace QAMQP +{ + class Exchange; + class Queue; + class ClientPrivate; + class ConnectionPrivate; + class Client : public QObject + { + Q_OBJECT + + Q_PROPERTY(quint32 port READ port WRITE setPort); + Q_PROPERTY(QString host READ host WRITE setHost); + Q_PROPERTY(QString virtualHost READ virtualHost WRITE setVirtualHost); + Q_PROPERTY(QString user READ user WRITE setUser); + Q_PROPERTY(QString password READ password WRITE setPassword); + + Q_DECLARE_PRIVATE(QAMQP::Client) + Q_DISABLE_COPY(Client) + friend class ConnectionPrivate; + friend class ChannelPrivate; + + public: + Client(QObject * parent = 0); + Client(const QUrl & connectionString, QObject * parent = 0); + ~Client(); + + void printConnect() const; + void closeChannel(); + + Exchange * createExchange(); + Exchange * createExchange(const QString &name); + + Queue * createQueue(); + Queue * createQueue(const QString &name); + + quint32 port() const; + void setPort(quint32 port); + + QString host() const; + void setHost(const QString & host); + + QString virtualHost() const; + void setVirtualHost(const QString & virtualHost); + + QString user() const; + void setUser(const QString & user); + + QString password() const; + void setPassword(const QString & password); + + protected: + Client(ClientPrivate &d, QObject* parent, const QUrl & connectionString); + + private: + friend struct ClientExceptionCleaner; + //void chanalConnect(); + }; +} + +#endif // qamqp_amqp_h__ \ No newline at end of file diff --git a/src/amqp_channel.cpp b/src/amqp_channel.cpp new file mode 100644 index 0000000..2967e39 --- /dev/null +++ b/src/amqp_channel.cpp @@ -0,0 +1,255 @@ +#include "amqp_channel.h" +#include "amqp_channel_p.h" + +#include "amqp.h" +#include "amqp_p.h" + +#include +#include +#include + +using namespace QAMQP; + +namespace QAMQP +{ + int ChannelPrivate::nextChannelNumber_ = 0; + struct ChannelExceptionCleaner + { + /* this cleans up when the constructor throws an exception */ + static inline void cleanup(Channel *that, ChannelPrivate *d) + { +#ifdef QT_NO_EXCEPTIONS + Q_UNUSED(that); + Q_UNUSED(d); +#else + Q_UNUSED(that); + Q_UNUSED(d); +#endif + } + }; +} + + +////////////////////////////////////////////////////////////////////////// + +QAMQP::Channel::Channel( Client * parent /*= 0*/ ) + : QObject(*new ChannelPrivate, 0) +{ + QT_TRY { + d_func()->init(parent); + } QT_CATCH(...) { + ChannelExceptionCleaner::cleanup(this, d_func()); + QT_RETHROW; + } +} + +QAMQP::Channel::Channel( ChannelPrivate &dd, Client* parent ) + : QObject(dd, 0) +{ + +} +QAMQP::Channel::~Channel() +{ + QObjectPrivate::clearGuards(this); + + QT_TRY { + QEvent e(QEvent::Destroy); + QCoreApplication::sendEvent(this, &e); + } QT_CATCH(const std::exception&) { + // if this fails we can't do anything about it but at least we are not allowed to throw. + } +} + +void QAMQP::Channel::closeChannel() +{ + +} + +void QAMQP::Channel::reopen() +{ + +} + +QString QAMQP::Channel::name() +{ + return d_func()->name; +} + +int QAMQP::Channel::channelNumber() +{ + return d_func()->number; +} + +void QAMQP::Channel::setParam( int param ) +{ + +} + +void QAMQP::Channel::setName( const QString &name ) +{ + d_func()->name = name; +} + +void QAMQP::Channel::stateChanged( int state ) +{ + switch(ChannelPrivate::State(state)) + { + case ChannelPrivate::csOpened: + emit opened(); + break; + case ChannelPrivate::csClosed: + emit closed(); + break; + case ChannelPrivate::csIdle: + emit flowChanged(false); + break; + case ChannelPrivate::csRunning: + emit flowChanged(true); + break; + } +} +////////////////////////////////////////////////////////////////////////// + +ChannelPrivate::ChannelPrivate(int version /* = QObjectPrivateVersion */) + :QObjectPrivate(version), number(++nextChannelNumber_) +{ + +} + +ChannelPrivate::~ChannelPrivate() +{ + +} + +void ChannelPrivate::init(Client * parent) +{ + q_func()->setParent(parent); + client_ = parent; +} + + +void ChannelPrivate::_q_method( const QAMQP::Frame::Method & frame ) +{ + if(frame.methodClass() != QAMQP::Frame::fcChannel + || frame.channel() != number ) + return; + + qDebug("Channel#%d:", number); + + switch(frame.id()) + { + case miOpenOk: + openOk(frame); + break; + case miFlow: + flow(frame); + break; + case miFlowOk: + flowOk(frame); + break; + case miClose: + close(frame); + break; + case miCloseOk: + closeOk(frame); + break; + } +} + +void ChannelPrivate::_q_open() +{ + qDebug("Open channel #%d", number); + open(); +} + + +void ChannelPrivate::sendFrame( const QAMQP::Frame::Base & frame ) +{ + client_->d_func()->network_->sendFrame(frame); +} + + +void ChannelPrivate::open() +{ + QAMQP::Frame::Method frame(QAMQP::Frame::fcChannel, miOpen); + frame.setChannel(number); + QByteArray arguments_; + arguments_.resize(1); + arguments_[0] = 0; + frame.setArguments(arguments_); + sendFrame(frame); +} + + +void ChannelPrivate::flow() +{ + +} + +void ChannelPrivate::flow( const QAMQP::Frame::Method & frame ) +{ + +} +void ChannelPrivate::flowOk() +{ + +} + +void ChannelPrivate::flowOk( const QAMQP::Frame::Method & frame ) +{ + +} + +void ChannelPrivate::close(int code, const QString & text, int classId, int methodId) +{ + QAMQP::Frame::Method frame(QAMQP::Frame::fcChannel, miClose); + QByteArray arguments_; + QDataStream stream(&arguments_, QIODevice::WriteOnly); + + QAMQP::Frame::writeField('s',stream, client_->virtualHost()); + + stream << qint16(code); + QAMQP::Frame::writeField('s', stream, text); + stream << qint16(classId); + stream << qint16(methodId); + + frame.setArguments(arguments_); + client_->d_func()->network_->sendFrame(frame); +} + +void ChannelPrivate::close( const QAMQP::Frame::Method & frame ) +{ + q_func()->stateChanged(csClosed); + + qDebug(">> CLOSE"); + QByteArray data = frame.arguments(); + QDataStream stream(&data, QIODevice::ReadOnly); + qint16 code_ = 0, classId, methodId; + stream >> code_; + QString text(QAMQP::Frame::readField('s', stream).toString()); + stream >> classId; + stream >> methodId; + + qDebug(">> code: %d", code_); + qDebug(">> text: %s", qPrintable(text)); + qDebug(">> class-id: %d", classId); + qDebug(">> method-id: %d", methodId); + +} + +void ChannelPrivate::closeOk() +{ + QAMQP::Frame::Method frame(QAMQP::Frame::fcChannel, miCloseOk); + sendFrame(frame); +} + +void ChannelPrivate::closeOk( const QAMQP::Frame::Method & frame ) +{ + q_func()->stateChanged(csClosed); +} + +void ChannelPrivate::openOk( const QAMQP::Frame::Method & frame ) +{ + qDebug(">> OpenOK"); + q_func()->stateChanged(csOpened); +} \ No newline at end of file diff --git a/src/amqp_channel.h b/src/amqp_channel.h new file mode 100644 index 0000000..6f9c19f --- /dev/null +++ b/src/amqp_channel.h @@ -0,0 +1,53 @@ +#ifndef amqp_channel_h__ +#define amqp_channel_h__ + +#include +#include "qamqp_global.h" +#include "amqp_frame.h" + +namespace QAMQP +{ + class ChannelPrivate; + class Client; + class Channel : public QObject + { + Q_OBJECT + + Q_PROPERTY(int number READ channelNumber); + Q_PROPERTY(QString name READ name WRITE setName); + + Q_DECLARE_PRIVATE(QAMQP::Channel) + Q_DISABLE_COPY(Channel) + public: + ~Channel(); + + void closeChannel(); + void reopen(); + + QString name(); + int channelNumber(); + + void setParam(int param); + void setName(const QString &name); + + signals: + void opened(); + void closed(); + void flowChanged(bool enabled); + + protected: + Channel(Client * parent = 0); + Channel(ChannelPrivate &dd, Client* parent); + + private: + void stateChanged(int state); + friend class ClientPrivate; + Q_PRIVATE_SLOT(d_func(), void _q_open()) + Q_PRIVATE_SLOT(d_func(), void _q_method(const QAMQP::Frame::Method & frame)) + }; +} + +#ifdef QAMQP_P_INCLUDE +# include "amqp_channel_p.h" +#endif +#endif // amqp_channel_h__ \ No newline at end of file diff --git a/src/amqp_channel_p.h b/src/amqp_channel_p.h new file mode 100644 index 0000000..45ecf17 --- /dev/null +++ b/src/amqp_channel_p.h @@ -0,0 +1,62 @@ +#ifndef amqp_channel_p_h__ +#define amqp_channel_p_h__ + +#include + +#define METHOD_ID_ENUM(name, id) name = id, name ## Ok + +namespace QAMQP +{ + class Client; + class ClientPrivate; + class ChannelPrivate : public QObjectPrivate + { + Q_DECLARE_PUBLIC(QAMQP::Channel) + public: + enum MethodId + { + METHOD_ID_ENUM(miOpen, 10), + METHOD_ID_ENUM(miFlow, 20), + METHOD_ID_ENUM(miClose, 40) + }; + + enum State { + csOpened, + csClosed, + csIdle, + csRunning + }; + + ChannelPrivate(int version = QObjectPrivateVersion); + ~ChannelPrivate(); + + void init(Client * parent); + + void open(); + void flow(); + void flowOk(); + void close(int code, const QString & text, int classId, int methodId); + void closeOk(); + + ////////////////////////////////////////////////////////////////////////// + + void openOk(const QAMQP::Frame::Method & frame); + void flow(const QAMQP::Frame::Method & frame); + void flowOk(const QAMQP::Frame::Method & frame); + void close(const QAMQP::Frame::Method & frame); + void closeOk(const QAMQP::Frame::Method & frame); + + void _q_method(const QAMQP::Frame::Method & frame); + void _q_open(); + + void sendFrame(const QAMQP::Frame::Base & frame); + + QPointer client_; + + QString name; + int number; + + static int nextChannelNumber_; + }; +} +#endif // amqp_channel_p_h__ \ No newline at end of file diff --git a/src/amqp_connection.cpp b/src/amqp_connection.cpp new file mode 100644 index 0000000..7b5e60b --- /dev/null +++ b/src/amqp_connection.cpp @@ -0,0 +1,327 @@ +#include "amqp_connection.h" +#include "amqp_connection_p.h" +#include "amqp.h" +#include "amqp_p.h" +#include "amqp_frame.h" + +#include +#include +#include + +using namespace QAMQP; + +namespace QAMQP +{ + struct ConnectionExceptionCleaner + { + /* this cleans up when the constructor throws an exception */ + static inline void cleanup(Connection *that, ConnectionPrivate *d) + { +#ifdef QT_NO_EXCEPTIONS + Q_UNUSED(that); + Q_UNUSED(d); +#else + Q_UNUSED(that); + Q_UNUSED(d); +#endif + } + }; + +} +////////////////////////////////////////////////////////////////////////// + + +ConnectionPrivate::ConnectionPrivate( int version /*= QObjectPrivateVersion*/ ) + :QObjectPrivate(version), closed_(false) +{ + +} + +ConnectionPrivate::~ConnectionPrivate() +{ + +} + +void ConnectionPrivate::init(Client * parent) +{ + q_func()->setParent(parent); + client_ = parent; +} + +void ConnectionPrivate::startOk() +{ + QAMQP::Frame::Method frame(QAMQP::Frame::fcConnection, miStartOk); + QByteArray arguments_; + QDataStream stream(&arguments_, QIODevice::WriteOnly); + + QAMQP::Frame::TableField clientProperties; + clientProperties["version"] = "0.0.1"; + clientProperties["platform"] = QString("Qt %1").arg(qVersion()); + clientProperties["product"] = "QAMQP"; + QAMQP::Frame::serialize(stream, clientProperties); + + QAMQP::Frame::writeField('s', stream, "AMQPLAIN"); + QAMQP::Frame::TableField response; + response["LOGIN"] = client_->user(); + response["PASSWORD"] = client_->password(); + QAMQP::Frame::serialize(stream, response); + QAMQP::Frame::writeField('s', stream, "en_US"); + + frame.setArguments(arguments_); + + client_->d_func()->network_->sendFrame(frame); + +} + +void ConnectionPrivate::secureOk() +{ + + +} + +void ConnectionPrivate::tuneOk() +{ + QAMQP::Frame::Method frame(QAMQP::Frame::fcConnection, miTuneOk); + QByteArray arguments_; + QDataStream stream(&arguments_, QIODevice::WriteOnly); + + stream << qint16(0); //channel_max + stream << qint32(FRAME_MAX); //frame_max + stream << qint16(0); //heartbeat + + frame.setArguments(arguments_); + client_->d_func()->network_->sendFrame(frame); +} + +void ConnectionPrivate::open() +{ + QAMQP::Frame::Method frame(QAMQP::Frame::fcConnection, miOpen); + QByteArray arguments_; + QDataStream stream(&arguments_, QIODevice::WriteOnly); + + QAMQP::Frame::writeField('s',stream, client_->virtualHost()); + + stream << qint8(0); + stream << qint8(0); + + frame.setArguments(arguments_); + client_->d_func()->network_->sendFrame(frame); +} + +void ConnectionPrivate::start( const QAMQP::Frame::Method & frame ) +{ + qDebug(">> Start"); + QByteArray data = frame.arguments(); + QDataStream stream(&data, QIODevice::ReadOnly); + quint8 version_major = 0; + quint8 version_minor = 0; + + stream >> version_major; + stream >> version_minor; + + QAMQP::Frame::TableField table; + QAMQP::Frame::deserialize(stream, table); + + QString mechanisms = QAMQP::Frame::readField('S', stream).toString(); + QString locales = QAMQP::Frame::readField('S', stream).toString(); + + qDebug(">> version_major: %d", version_major); + qDebug(">> version_minor: %d", version_minor); + + QAMQP::Frame::print(table); + + qDebug(">> mechanisms: %s", qPrintable(mechanisms)); + qDebug(">> locales: %s", qPrintable(locales)); + + startOk(); +} + +void ConnectionPrivate::secure( const QAMQP::Frame::Method & frame ) +{ + +} + +void ConnectionPrivate::tune( const QAMQP::Frame::Method & frame ) +{ + qDebug(">> Tune"); + QByteArray data = frame.arguments(); + QDataStream stream(&data, QIODevice::ReadOnly); + + qint16 channel_max = 0, + heartbeat = 0; + qint32 frame_max = 0; + + stream >> channel_max; + stream >> frame_max; + stream >> heartbeat; + qDebug(">> channel_max: %d", channel_max); + qDebug(">> frame_max: %d", frame_max); + qDebug(">> heartbeat: %d", heartbeat); + + tuneOk(); + open(); +} + +void ConnectionPrivate::openOk( const QAMQP::Frame::Method & frame ) +{ + qDebug(">> OpenOK"); + q_func()->openOk(); +} + +void ConnectionPrivate::close( const QAMQP::Frame::Method & frame ) +{ + qDebug(">> CLOSE"); + QByteArray data = frame.arguments(); + QDataStream stream(&data, QIODevice::ReadOnly); + qint16 code_ = 0, classId, methodId; + stream >> code_; + QString text(QAMQP::Frame::readField('s', stream).toString()); + stream >> classId; + stream >> methodId; + + qDebug(">> code: %d", code_); + qDebug(">> text: %s", qPrintable(text)); + qDebug(">> class-id: %d", classId); + qDebug(">> method-id: %d", methodId); +} + +void ConnectionPrivate::close(int code, const QString & text, int classId, int methodId) +{ + QAMQP::Frame::Method frame(QAMQP::Frame::fcConnection, miClose); + QByteArray arguments_; + QDataStream stream(&arguments_, QIODevice::WriteOnly); + + QAMQP::Frame::writeField('s',stream, client_->virtualHost()); + + stream << qint16(code); + QAMQP::Frame::writeField('s', stream, text); + stream << qint16(classId); + stream << qint16(methodId); + + frame.setArguments(arguments_); + client_->d_func()->network_->sendFrame(frame); +} + +void ConnectionPrivate::closeOk() +{ + QAMQP::Frame::Method frame(QAMQP::Frame::fcConnection, miCloseOk); + client_->d_func()->network_->sendFrame(frame); +} + +void ConnectionPrivate::closeOk( const QAMQP::Frame::Method & ) +{ + QMetaObject::invokeMethod(q_func(), "disconnected"); +} + +void ConnectionPrivate::_q_method( const QAMQP::Frame::Method & frame ) +{ + if(frame.methodClass() != QAMQP::Frame::fcConnection) + return; + + qDebug() << "Connection:"; + + if (closed_) + { + if( frame.id() == miCloseOk) + closeOk(frame); + return; + } + + switch(MethodId(frame.id())) + { + case miStart: + start(frame); + break; + case miSecure: + secure(frame); + break; + case miTune: + tune(frame); + break; + case miOpenOk: + openOk(frame); + break; + case miClose: + close(frame); + break; + case miCloseOk: + closeOk(frame); + break; + default: + qWarning("Unknown method-id %d", frame.id()); + } + +} + +////////////////////////////////////////////////////////////////////////// + +Connection::Connection( Client * parent /*= 0*/ ) + : QObject(*new ConnectionPrivate, 0) +{ + QT_TRY { + d_func()->init(parent); + } QT_CATCH(...) { + ConnectionExceptionCleaner::cleanup(this, d_func()); + QT_RETHROW; + } +} + +Connection::Connection(ConnectionPrivate &dd, Client* parent) +: QObject(dd, 0) +{ + Q_D(QAMQP::Connection); + QT_TRY { + d->init(parent); + } QT_CATCH(...) { + ConnectionExceptionCleaner::cleanup(this, d_func()); + QT_RETHROW; + } +} + +Connection::~Connection() +{ + QObjectPrivate::clearGuards(this); + + QT_TRY { + QEvent e(QEvent::Destroy); + QCoreApplication::sendEvent(this, &e); + } QT_CATCH(const std::exception&) { + // if this fails we can't do anything about it but at least we are not allowed to throw. + } +} + +void Connection::startOk() +{ + d_func()->startOk(); +} + +void Connection::secureOk() +{ + d_func()->secureOk(); +} + +void Connection::tuneOk() +{ + d_func()->tuneOk(); +} + +void Connection::open() +{ + d_func()->open(); +} + +void Connection::close(int code, const QString & text, int classId , int methodId) +{ + d_func()->close(code, text, classId, methodId); +} + +void Connection::closeOk() +{ + d_func()->closeOk(); + emit disconnect(); +} + +void Connection::openOk() +{ + emit connected(); +} \ No newline at end of file diff --git a/src/amqp_connection.h b/src/amqp_connection.h new file mode 100644 index 0000000..eb7a93f --- /dev/null +++ b/src/amqp_connection.h @@ -0,0 +1,45 @@ +#ifndef amqp_connection_h__ +#define amqp_connection_h__ + +#include +#include "amqp_frame.h" +#include "qamqp_global.h" + +namespace QAMQP +{ + class ConnectionPrivate; + class ClientPrivate; + class Client; + class Connection : public QObject + { + Q_OBJECT + Q_DECLARE_PRIVATE(QAMQP::Connection) + Q_DISABLE_COPY(Connection) + Connection(Client * parent = 0); + public: + ~Connection(); + + void startOk(); + void secureOk(); + void tuneOk(); + void open(); + void close(int code, const QString & text, int classId = 0, int methodId = 0); + void closeOk(); + Q_SIGNALS: + void disconnected(); + void connected(); + protected: + Connection(ConnectionPrivate &dd, Client* parent); + private: + void openOk(); + friend class ClientPrivate; + Q_PRIVATE_SLOT(d_func(), void _q_method(const QAMQP::Frame::Method & frame)) + }; +} + +// Include private header so MOC won't complain +#ifdef QAMQP_P_INCLUDE +# include "amqp_connection_p.h" +#endif + +#endif // amqp_connection_h__ \ No newline at end of file diff --git a/src/amqp_connection_p.h b/src/amqp_connection_p.h new file mode 100644 index 0000000..879f95b --- /dev/null +++ b/src/amqp_connection_p.h @@ -0,0 +1,46 @@ +#ifndef amqp_connection_p_h__ +#define amqp_connection_p_h__ + +#include + +#define METHOD_ID_ENUM(name, id) name = id, name ## Ok + +namespace QAMQP +{ + class Client; + class ClientPrivate; + class ConnectionPrivate : public QObjectPrivate + { + Q_DECLARE_PUBLIC(QAMQP::Connection) + public: + enum MethodId + { + METHOD_ID_ENUM(miStart, 10), + METHOD_ID_ENUM(miSecure, 20), + METHOD_ID_ENUM(miTune, 30), + METHOD_ID_ENUM(miOpen, 40), + METHOD_ID_ENUM(miClose, 50) + }; + + ConnectionPrivate(int version = QObjectPrivateVersion); + ~ConnectionPrivate(); + void init(Client * parent); + void startOk(); + void secureOk(); + void tuneOk(); + void open(); + void close(int code, const QString & text, int classId = 0, int methodId = 0); + void closeOk(); + + void start(const QAMQP::Frame::Method & frame); + void secure(const QAMQP::Frame::Method & frame); + void tune(const QAMQP::Frame::Method & frame); + void openOk(const QAMQP::Frame::Method & frame); + void close(const QAMQP::Frame::Method & frame); + void closeOk(const QAMQP::Frame::Method & frame); + void _q_method(const QAMQP::Frame::Method & frame); + QPointer client_; + bool closed_; + }; +} +#endif // amqp_connection_p_h__ \ No newline at end of file diff --git a/src/amqp_exchange.h b/src/amqp_exchange.h new file mode 100644 index 0000000..f908296 --- /dev/null +++ b/src/amqp_exchange.h @@ -0,0 +1,18 @@ +#ifndef amqp_exchange_h__ +#define amqp_exchange_h__ + +#include "amqp_channel.h" +namespace QAMQP +{ + class Client; + class ClientPrivate; + class Exchange : public Channel + { + Q_OBJECT; + Exchange(Client * parent = 0) : Channel(parent) {} + public: + friend class ClientPrivate; + ~Exchange(){} + }; +} +#endif // amqp_exchange_h__ \ No newline at end of file diff --git a/src/amqp_frame.cpp b/src/amqp_frame.cpp new file mode 100644 index 0000000..84470f8 --- /dev/null +++ b/src/amqp_frame.cpp @@ -0,0 +1,461 @@ +#include "amqp_frame.h" +#define AMQP_FRAME_END 0xCE + +#include +#include +#include +#include + +using namespace QAMQP::Frame; +Base::Base( Type type ) :type_(type), channel_(0), size_(0) {} + +Base::Base( QDataStream& raw ){ + readHeader(raw); +} + +Type Base::type() const +{ + return Type(type_); +} + +void Base::setChannel( qint16 channel ) +{ + channel_ = channel; +} + +qint16 Base::channel() const +{ + return channel_; +} + +qint32 Base::size() const +{ + return 0; +} + +void QAMQP::Frame::Base::writeHeader( QDataStream & stream ) const +{ + stream << type_; + stream << channel_; + stream << qint32(size()); + +} + +void QAMQP::Frame::Base::writeEnd( QDataStream & stream ) const +{ + stream << qint8(AMQP_FRAME_END); +} + +void QAMQP::Frame::Base::writePayload( QDataStream & stream ) const{} + +void QAMQP::Frame::Base::readHeader( QDataStream & stream ) +{ + stream >> type_; + stream >> channel_; + stream >> size_; + + /* + stream.readRawData(reinterpret_cast(&type_), sizeof(type_)); + stream.readRawData(reinterpret_cast(&channel_), sizeof(channel_)); + stream.readRawData(reinterpret_cast(&size_), sizeof(size_));*/ + +} + +void QAMQP::Frame::Base::readEnd( QDataStream & stream ) +{ + char end_ = 0; + stream.readRawData(reinterpret_cast(&end_), sizeof(end_)); + if(end_ != AMQP_FRAME_END ) + { + qWarning("Wrong end of frame"); + } +} + +void QAMQP::Frame::Base::readPayload( QDataStream & stream ) +{ + stream.skipRawData(size_); +} + +void QAMQP::Frame::Base::toStream( QDataStream & stream ) const +{ + writeHeader(stream); + writePayload(stream); + writeEnd(stream); +} + +////////////////////////////////////////////////////////////////////////// + + +QAMQP::Frame::Method::Method( MethodClass methodClass, qint16 id ) +: Base(ftMethod), methodClass_(methodClass), id_(id) +{ + +} + +QAMQP::Frame::Method::Method( QDataStream& raw ) +: Base(raw) +{ + readPayload(raw); +} + +QAMQP::Frame::Method::Method(): Base(ftMethod) +{ + +} + +MethodClass QAMQP::Frame::Method::methodClass() const +{ + return MethodClass(methodClass_); +} + +qint16 QAMQP::Frame::Method::id() const +{ + return id_; +} + +qint32 QAMQP::Frame::Method::size() const +{ + return sizeof(id_) + sizeof(methodClass_) + arguments_.size(); +} + +void QAMQP::Frame::Method::setArguments( const QByteArray & data ) +{ + arguments_ = data; +} + +QByteArray QAMQP::Frame::Method::arguments() const +{ + return arguments_; +} + +void QAMQP::Frame::Method::readPayload( QDataStream & stream ) +{ + stream >> methodClass_; + stream >> id_; + + arguments_.resize(size_ - (sizeof(id_) + sizeof(methodClass_))); + stream.readRawData(arguments_.data(), arguments_.size()); +} + +void QAMQP::Frame::Method::writePayload( QDataStream & stream ) const +{ + stream << quint16(methodClass_); + stream << quint16(id_); + stream.writeRawData(arguments_.data(), arguments_.size()); +} + + +////////////////////////////////////////////////////////////////////////// + + +QVariant QAMQP::Frame::readField( qint8 valueType, QDataStream &s ) +{ + QVariant value; + QByteArray tmp; + qint8 nameSize_; + char octet[1] = {0}, octet2[2] = {0}, octet4[4] = {0}, octet8[8] = {0}; + switch(valueType) + { + case 't': + s.readRawData(octet, sizeof(octet)); + value = QVariant::fromValue(*octet > 0); + break; + case 'b': + s.readRawData(octet, sizeof(octet)); + value = QVariant::fromValue(*octet); + break; + case 'B': + s.readRawData(octet, sizeof(octet)); + value = QVariant::fromValue(*octet); + break; + case 'U': + s.readRawData(octet2, sizeof(octet2)); + value = QVariant::fromValue(*reinterpret_cast(octet2)); + break; + case 'u': + s.readRawData(octet2, sizeof(octet2)); + value = QVariant::fromValue(*reinterpret_cast(octet2)); + break; + case 'I': + s.readRawData(octet4, sizeof(octet4)); + value = QVariant::fromValue(*reinterpret_cast(octet4)); + break; + case 'i': + s.readRawData(octet4, sizeof(octet4)); + value = QVariant::fromValue(*reinterpret_cast(octet4)); + break; + case 'L': + s.readRawData(octet8, sizeof(octet8)); + value = QVariant::fromValue(*reinterpret_cast(octet8)); + break; + case 'l': + s.readRawData(octet8, sizeof(octet8)); + value = QVariant::fromValue(*reinterpret_cast(octet8)); + break; + case 'f': + s.readRawData(octet4, sizeof(octet4)); + value = QVariant::fromValue(*reinterpret_cast(octet4)); + break; + case 'd': + s.readRawData(octet8, sizeof(octet8)); + value = QVariant::fromValue(*reinterpret_cast(octet8)); + break; + case 'D': + { + QAMQP::Frame::decimal v; + s >> v.scale; + s >> v.value; + value = QVariant::fromValue(v); + } + break; + case 's': + s >> nameSize_; + tmp.resize(nameSize_); + s.readRawData(tmp.data(), tmp.size()); + value = QString::fromAscii(tmp.data(), nameSize_); + break; + case 'S': + { + quint32 length_ = 0; + s >> length_; + tmp.resize(length_); + } + s.readRawData(tmp.data(), tmp.size()); + value = QString::fromAscii(tmp.data(), tmp.size()); + break; + case 'A': + { + qint32 length_ = 0; + qint8 type = 0; + s >> length_; + QList array_; + for (int i =0; i < length_; ++i) + { + s >> type; + array_ << readField(type, s); + } + value = array_; + } + break; + case 'T': + s.readRawData(octet8, sizeof(octet8)); + value = QDateTime::fromMSecsSinceEpoch(*reinterpret_cast(octet8)); + break; + case 'F': + { + TableField table_; + deserialize(s, table_); + value = table_; + } + break; + case 'V': + break; + default: + qWarning("Unknown field type"); + } + return value; +} + +QDataStream & QAMQP::Frame::deserialize( QDataStream & stream, QAMQP::Frame::TableField & f ) +{ + QByteArray data; + stream >> data; + QDataStream s(&data, QIODevice::ReadOnly); + + while(!s.atEnd()) + { + qint8 valueType = 0; + + QString name = readField('s', s).toString(); + s >> valueType; + f[name] = readField(valueType, s); + } + + return stream; +} + +QDataStream & QAMQP::Frame::serialize( QDataStream & stream, const TableField & f ) +{ + QByteArray data; + QDataStream s(&data, QIODevice::WriteOnly); + TableField::ConstIterator i; + for(i = f.begin(); i != f.end(); ++i) + { + writeField('s', s, i.key()); + writeField(s, i.value()); + } + stream << data; + return stream; +} + +void QAMQP::Frame::print( const TableField & f ) +{ + TableField::ConstIterator i; + for(i = f.begin(); i != f.end(); ++i) + { + switch(i.value().type()) + { + case QVariant::Hash: + qDebug() << "\t" << qPrintable(i.key()) << ": FIELD_TABLE"; + break; + case QVariant::List: + qDebug() << "\t" << qPrintable(i.key()) << ": ARRAY"; + break; + default: + qDebug() << "\t" << qPrintable(i.key()) << ": " << i.value(); + } + } +} + +void QAMQP::Frame::writeField( qint8 valueType, QDataStream &s, const QVariant & value, bool withType ) +{ + QByteArray tmp; + qint8 nameSize_; + if(withType) + s << valueType; // Запишем тип поля + + switch(valueType) + { + case 't': + s << (value.toBool() ? qint8(1) : qint8(0)); + break; + case 'b': + s << qint8(value.toInt()); + break; + case 'B': + s << quint8(value.toUInt()); + break; + case 'U': + s << qint16(value.toInt()); + break; + case 'u': + s << quint16(value.toUInt()); + break; + case 'I': + s << qint32(value.toInt()); + break; + case 'i': + s << quint32(value.toUInt()); + break; + case 'L': + s << qlonglong(value.toLongLong()); + break; + case 'l': + s << qulonglong(value.toULongLong()); + break; + case 'f': + s << value.toFloat(); + break; + case 'd': + s << value.toDouble(); + break; + case 'D': + { + QAMQP::Frame::decimal v(value.value()); + s << v.scale; + s << v.value; + } + break; + case 's': + { + QString str = value.toString(); + s << quint8(str.length()); + s.writeRawData(str.toAscii().data(), str.length()); + } + break; + case 'S': + { + QString str = value.toString(); + s << quint32(str.length()); + s.writeRawData(str.toAscii().data(), str.length()); + } + break; + case 'A': + { + QList array_(value.toList()); + s << quint32(array_.count()); + for (int i =0; i < array_.count(); ++i) + { + writeField(s, array_.at(i)); + } + } + break; + case 'T': + s << qulonglong(value.toDateTime().toMSecsSinceEpoch()); + break; + case 'F': + { + TableField table_(value.toHash()); + serialize(s, table_); + } + break; + case 'V': + break; + default: + qWarning("Unknown field type"); + } +} + +void QAMQP::Frame::writeField( QDataStream &s, const QVariant & value ) +{ + char type = 0; + switch(value.type()) + { + case QVariant::Bool: + type = 't'; + break; + case QVariant::Int: + { + int i = qAbs(value.toInt()); + if(i <= qint8(0xFF)) { + type = 'b'; + } else if(i <= qint16(0xFFFF)) { + type = 'U'; + } else if(i <= qint16(0xFFFFFFFF)) { + type = 'I'; + } + } + break; + case QVariant::UInt: + { + int i = value.toInt(); + if(i <= qint8(0xFF)) { + type = 'B'; + } else if(i <= qint16(0xFFFF)) { + type = 'u'; + } else if(i <= qint16(0xFFFFFFFF)) { + type = 'i'; + } + } + break; + case QVariant::LongLong: + type = 'L'; + break; + case QVariant::ULongLong: + type = 'l'; + break; + case QVariant::String: + /* + { + QString str = value.toString(); + type = str.length() > 255 ? 'S' : 's'; + }*/ + type = 'S'; + break; + case QVariant::DateTime: + type = 'T'; + break; + case QVariant::Double: + type = value.toDouble() > FLT_MAX ? 'd' : 'f'; + break; + case QVariant::Hash: + type = 'F'; + break; + case QVariant::List: + type = 'A'; + break; + } + + if(type) + writeField(type, s, value, true); +} \ No newline at end of file diff --git a/src/amqp_frame.h b/src/amqp_frame.h new file mode 100644 index 0000000..31a916b --- /dev/null +++ b/src/amqp_frame.h @@ -0,0 +1,97 @@ +#ifndef amqp_frame_h__ +#define amqp_frame_h__ + +#include +#include +#include + +namespace QAMQP +{ + namespace Frame + { + enum Type + { + ftMethod = 1, + ftHeader = 2, + ftBody = 3, + ftHeartbeat = 8 + }; + + enum MethodClass + { + fcConnection = 10, + fcChannel = 20, + fcExchange = 40, + fcQueue = 50, + fcBasic = 60, + fcTx = 90, + }; + + struct decimal + { + qint8 scale; + quint32 value; + + }; + Q_DECLARE_METATYPE(QAMQP::Frame::decimal); + + typedef QHash TableField; + Q_DECLARE_METATYPE(QAMQP::Frame::TableField); + + QDataStream & serialize( QDataStream & stream, const QAMQP::Frame::TableField & f ); + QDataStream & deserialize( QDataStream & stream, QAMQP::Frame::TableField & f ); + QVariant readField( qint8 valueType, QDataStream &s ); + void writeField( QDataStream &s, const QVariant & value ); + void writeField( qint8 valueType, QDataStream &s, const QVariant & value, bool withType = false ); + void print( const QAMQP::Frame::TableField & f ); + + class Base + { + public: + Base(Type type); + Base(QDataStream& raw); + Type type() const; + void setChannel(qint16 channel); + qint16 channel() const; + virtual qint32 size() const; + void toStream(QDataStream & stream) const; + protected: + void writeHeader(QDataStream & stream) const; + virtual void writePayload(QDataStream & stream) const; + void writeEnd(QDataStream & stream) const; + + void readHeader(QDataStream & stream); + virtual void readPayload(QDataStream & stream); + void readEnd(QDataStream & stream); + + qint32 size_; + private: + qint8 type_; + qint16 channel_; + + }; + + class Method : public Base + { + public: + Method(); + Method(MethodClass methodClass, qint16 id); + Method(QDataStream& raw); + + MethodClass methodClass() const; + qint16 id() const; + qint32 size() const; + void setArguments(const QByteArray & data); + QByteArray arguments() const; + + protected: + void writePayload(QDataStream & stream) const; + void readPayload(QDataStream & stream); + short methodClass_; + qint16 id_; + QByteArray arguments_; + }; + } +} + +#endif // amqp_frame_h__ \ No newline at end of file diff --git a/src/amqp_message.h b/src/amqp_message.h new file mode 100644 index 0000000..e69de29 diff --git a/src/amqp_network.cpp b/src/amqp_network.cpp new file mode 100644 index 0000000..470c944 --- /dev/null +++ b/src/amqp_network.cpp @@ -0,0 +1,108 @@ +#include "amqp_network.h" + +#include + +QAMQP::Network::Network( QObject * parent /*= 0*/ ) +{ + qRegisterMetaType("QAMQP::Frame::Method"); + + socket_ = new QTcpSocket(this); + buffer_ = new QBuffer(this); + offsetBuf = 0; + leftSize = 0; + + + buffer_->open(QIODevice::ReadWrite); + connect(socket_, SIGNAL(connected()), this, SLOT(connected())); + connect(socket_, SIGNAL(disconnected()), this, SLOT(disconnected())); + connect(socket_, SIGNAL(readyRead()), this, SLOT(readyRead())); + connect(socket_, SIGNAL(error(QAbstractSocket::SocketError)), this, SLOT(error(QAbstractSocket::SocketError))); +} + +QAMQP::Network::~Network() +{ + disconnect(); +} + +void QAMQP::Network::connectTo( const QString & host, quint32 port ) +{ + socket_->connectToHost(host, port); +} + +void QAMQP::Network::disconnect() +{ + +} + +void QAMQP::Network::connected() +{ + char header_[8] = {'A', 'M', 'Q', 'P', 0,0,9,1}; + socket_->write(header_, 8); +} + +void QAMQP::Network::disconnected() +{ + +} + +void QAMQP::Network::error( QAbstractSocket::SocketError socketError ) +{ + +} + +void QAMQP::Network::readyRead() +{ + QDataStream streamA(socket_); + QDataStream streamB(buffer_); + + /* + Вычитать заголовок, поместить в буфер + вычитать весь фрейм, если фрейм вычитан то кинуть на разбор его + */ + while(!socket_->atEnd()) + { + if(leftSize == 0) // Если ранее прочитан был весь фрейм, то читаем заголовок фрейма + { + lastType_ = 0; + qint16 channel_ = 0; + leftSize = 0; + offsetBuf = 0; + + streamA >> lastType_; + streamB << lastType_; + streamA >> channel_; + streamB << channel_; + streamA >> leftSize; + streamB << leftSize; + leftSize++; // увеличим размер на 1, для захвата конца фрейма + } + + QByteArray data_; + data_.resize(leftSize); + offsetBuf = streamA.readRawData(data_.data(), data_.size()); + leftSize -= offsetBuf; + streamB.writeRawData(data_.data(), offsetBuf); + if(leftSize == 0) + { + buffer_->reset(); + switch(QAMQP::Frame::Type(lastType_)) + { + case QAMQP::Frame::ftMethod: + { + QAMQP::Frame::Method frame(streamB); + emit method(frame); + } + break; + default: + qWarning("Unknown frame type"); + } + buffer_->reset(); + } + } +} + +void QAMQP::Network::sendFrame( const QAMQP::Frame::Base & frame ) +{ + QDataStream stream(socket_); + frame.toStream(stream); +} \ No newline at end of file diff --git a/src/amqp_network.h b/src/amqp_network.h new file mode 100644 index 0000000..dc2c96c --- /dev/null +++ b/src/amqp_network.h @@ -0,0 +1,44 @@ +#ifndef amqp_network_h__ +#define amqp_network_h__ + +#include +#include +#include +#include + +#include "amqp_frame.h" + +namespace QAMQP +{ + class Network : public QObject + { + Q_OBJECT + Q_DISABLE_COPY(Network) + public: + Network(QObject * parent = 0); + ~Network(); + + void connectTo(const QString & host, quint32 port); + void disconnect(); + void sendFrame(); + + void sendFrame(const QAMQP::Frame::Base & frame); + + signals: + void method(const QAMQP::Frame::Method & method); + + private slots: + void connected(); + void disconnected(); + void error( QAbstractSocket::SocketError socketError ); + void readyRead(); + + private: + QPointer socket_; + QPointer buffer_; + int offsetBuf; + int leftSize; + qint8 lastType_; + }; +} +#endif // amqp_network_h__ diff --git a/src/amqp_p.h b/src/amqp_p.h new file mode 100644 index 0000000..31e5369 --- /dev/null +++ b/src/amqp_p.h @@ -0,0 +1,38 @@ +#ifndef qamqp_amqp_p_h__ +#define qamqp_amqp_p_h__ +#include + + +#include "amqp_network.h" +#include "amqp_connection.h" + +namespace QAMQP +{ + class ClientPrivate : public QObjectPrivate + { + Q_DECLARE_PUBLIC(QAMQP::Client) + public: + ClientPrivate(int version = QObjectPrivateVersion); + ~ClientPrivate(); + + void init(QObject * parent); + void init(QObject * parent, const QUrl & connectionString); + void printConnect() const; + void connect(); + void parseCnnString( const QUrl & connectionString); + void sockConnect(); + void login(); + + Exchange * createExchange(const QString &name); + Queue * createQueue(const QString &name); + + quint32 port; + QString host; + QString virtualHost; + QString user; + QString password; + QPointer network_; + QPointer connection_; + }; +} +#endif // amqp_p_h__ diff --git a/src/amqp_private.h b/src/amqp_private.h new file mode 100644 index 0000000..2b326a9 --- /dev/null +++ b/src/amqp_private.h @@ -0,0 +1,27 @@ +#include + +namespace QAMQP +{ + + enum amqp_field_value_kind_t { + AMQP_FIELD_KIND_BOOLEAN = 't', + AMQP_FIELD_KIND_I8 = 'b', + AMQP_FIELD_KIND_U8 = 'B', + AMQP_FIELD_KIND_I16 = 's', + AMQP_FIELD_KIND_U16 = 'u', + AMQP_FIELD_KIND_I32 = 'I', + AMQP_FIELD_KIND_U32 = 'i', + AMQP_FIELD_KIND_I64 = 'l', + AMQP_FIELD_KIND_U64 = 'L', + AMQP_FIELD_KIND_F32 = 'f', + AMQP_FIELD_KIND_F64 = 'd', + AMQP_FIELD_KIND_DECIMAL = 'D', + AMQP_FIELD_KIND_UTF8 = 'S', + AMQP_FIELD_KIND_ARRAY = 'A', + AMQP_FIELD_KIND_TIMESTAMP = 'T', + AMQP_FIELD_KIND_TABLE = 'F', + AMQP_FIELD_KIND_VOID = 'V', + AMQP_FIELD_KIND_BYTES = 'x' + } ; + +} \ No newline at end of file diff --git a/src/amqp_queue.h b/src/amqp_queue.h new file mode 100644 index 0000000..e69de29 diff --git a/src/main.cpp b/src/main.cpp new file mode 100644 index 0000000..c2c3b4e --- /dev/null +++ b/src/main.cpp @@ -0,0 +1,18 @@ + +#include +#include "amqp.h" +#include "amqp_exchange.h" + +int main(int argc, char *argv[]) +{ + QCoreApplication a(argc, argv); + + QUrl con(QString("amqp://guest:16141614@main:5672/")); + QAMQP::Client client(con); + QAMQP::Exchange * exchange_ = client.createExchange(), + *exchange2_ = client.createExchange(); + + client.printConnect(); + + return a.exec(); +} diff --git a/src/qamqp_global.h b/src/qamqp_global.h new file mode 100644 index 0000000..704be3e --- /dev/null +++ b/src/qamqp_global.h @@ -0,0 +1,18 @@ +#ifndef qamqp_global_h__ +#define qamqp_global_h__ + +#include + +#define QAMQP_P_INCLUDE +#define AMQPSCHEME "amqp" +#define AMQPPORT 5672 +#define AMQPHOST "localhost" +#define AMQPVHOST "/" +#define AMQPLOGIN "guest" +#define AMQPPSWD "guest" +#define FRAME_MAX 131072 + +#define AMQP_CONNECTION_FORCED 320 + + +#endif // qamqp_global_h__