diff --git a/src/qamqp/amqp.cpp b/src/qamqp/amqp.cpp index d917993..c7d5f91 100644 --- a/src/qamqp/amqp.cpp +++ b/src/qamqp/amqp.cpp @@ -49,7 +49,8 @@ ClientPrivate::~ClientPrivate() void ClientPrivate::init(QObject * parent) { pq_func()->setParent(parent); - if(!network_){ + if(!network_) + { network_ = new QAMQP::Network(pq_func()); } @@ -58,10 +59,9 @@ void ClientPrivate::init(QObject * parent) connection_ = new QAMQP::Connection(pq_func()); } - setAuth(new AMQPlainAuthenticator(QString::fromLatin1(AMQPLOGIN), QString::fromLatin1(AMQPPSWD))); + network_->setMethodHandlerConnection(connection_); - QObject::connect(network_, SIGNAL(method(const QAMQP::Frame::Method &)), - connection_, SLOT(_q_method(const QAMQP::Frame::Method &))); + setAuth(new AMQPlainAuthenticator(QString::fromLatin1(AMQPLOGIN), QString::fromLatin1(AMQPPSWD))); QObject::connect(connection_, SIGNAL(connected()), pq_func(), SIGNAL(connected())); QObject::connect(connection_, SIGNAL(disconnected()), pq_func(), SIGNAL(disconnected())); @@ -134,32 +134,30 @@ void ClientPrivate::login() Exchange * ClientPrivate::createExchange(int channelNumber, const QString &name ) { Exchange * exchange_ = new Exchange(channelNumber, pq_func()); - QObject::connect(network_, SIGNAL(method(const QAMQP::Frame::Method &)), - exchange_, SLOT(_q_method(const QAMQP::Frame::Method &))); + + network_->addMethodHandlerForChannel(exchange_->channelNumber(), exchange_); QObject::connect(connection_, SIGNAL(connected()), exchange_, SLOT(_q_open())); exchange_->pd_func()->open(); QObject::connect(pq_func(), SIGNAL(disconnected()), exchange_, SLOT(_q_disconnected())); exchange_->setName(name); + return exchange_; } Queue * ClientPrivate::createQueue(int channelNumber, const QString &name ) { Queue * queue_ = new Queue(channelNumber, pq_func()); - QObject::connect(network_, SIGNAL(method(const QAMQP::Frame::Method &)), - queue_, SLOT(_q_method(const QAMQP::Frame::Method &))); - QObject::connect(network_, SIGNAL(content(const QAMQP::Frame::Content &)), - queue_, SLOT(_q_content(const QAMQP::Frame::Content &))); - - QObject::connect(network_, SIGNAL(body(int, const QByteArray &)), - queue_, SLOT(_q_body(int, const QByteArray &))); + network_->addMethodHandlerForChannel(queue_->channelNumber(), queue_); + network_->addContentHandlerForChannel(queue_->channelNumber(), queue_); + network_->addContentBodyHandlerForChannel(queue_->channelNumber(), queue_); QObject::connect(connection_, SIGNAL(connected()), queue_, SLOT(_q_open())); queue_->pd_func()->open(); QObject::connect(pq_func(), SIGNAL(disconnected()), queue_, SLOT(_q_disconnected())); queue_->setName(name); + return queue_; } diff --git a/src/qamqp/amqp_channel.cpp b/src/qamqp/amqp_channel.cpp index 47de44f..4b33953 100644 --- a/src/qamqp/amqp_channel.cpp +++ b/src/qamqp/amqp_channel.cpp @@ -109,6 +109,11 @@ void QAMQP::Channel::stateChanged( int state ) } } +void QAMQP::Channel::_q_method(const Frame::Method &frame) +{ + pd_func()->_q_method(frame); +} + bool QAMQP::Channel::isOpened() const { return pd_func()->opened; @@ -156,6 +161,7 @@ void ChannelPrivate::init(int channelNumber, Client * parent) bool ChannelPrivate::_q_method( const QAMQP::Frame::Method & frame ) { + Q_ASSERT(frame.channel() == number); if(frame.channel() != number ) return true; diff --git a/src/qamqp/amqp_channel.h b/src/qamqp/amqp_channel.h index ac7aaf6..f30ad9b 100644 --- a/src/qamqp/amqp_channel.h +++ b/src/qamqp/amqp_channel.h @@ -9,7 +9,7 @@ namespace QAMQP { class ChannelPrivate; class Client; - class Channel : public QObject + class Channel : public QObject, public Frame::MethodHandler { Q_OBJECT @@ -47,8 +47,9 @@ namespace QAMQP private: void stateChanged(int state); friend class ClientPrivate; + void _q_method(const QAMQP::Frame::Method & frame); + Q_PRIVATE_SLOT(pd_func(), void _q_open()) - Q_PRIVATE_SLOT(pd_func(), void _q_method(const QAMQP::Frame::Method & frame)) Q_PRIVATE_SLOT(pd_func(), void _q_disconnected()) }; } @@ -56,4 +57,4 @@ namespace QAMQP #ifdef QAMQP_P_INCLUDE # include "amqp_channel_p.h" #endif -#endif // amqp_channel_h__ \ No newline at end of file +#endif // amqp_channel_h__ diff --git a/src/qamqp/amqp_connection.cpp b/src/qamqp/amqp_connection.cpp index ffffdd8..0301742 100644 --- a/src/qamqp/amqp_connection.cpp +++ b/src/qamqp/amqp_connection.cpp @@ -260,6 +260,7 @@ void ConnectionPrivate::setQOS( qint32 prefetchSize, quint16 prefetchCount, int bool ConnectionPrivate::_q_method( const QAMQP::Frame::Method & frame ) { + Q_ASSERT(frame.methodClass() == QAMQP::Frame::fcConnection); if(frame.methodClass() != QAMQP::Frame::fcConnection) return true; @@ -364,6 +365,11 @@ void Connection::openOk() emit connected(); } +void Connection::_q_method(const QAMQP::Frame::Method & frame) +{ + pd_func()->_q_method(frame); +} + bool Connection::isConnected() const { return pd_func()->connected; diff --git a/src/qamqp/amqp_connection.h b/src/qamqp/amqp_connection.h index 3ab0e6d..531a1be 100644 --- a/src/qamqp/amqp_connection.h +++ b/src/qamqp/amqp_connection.h @@ -11,7 +11,7 @@ namespace QAMQP class ChannelPrivate; class ClientPrivate; class Client; - class Connection : public QObject + class Connection : public QObject, public Frame::MethodHandler { Q_OBJECT P_DECLARE_PRIVATE(QAMQP::Connection) @@ -45,8 +45,9 @@ namespace QAMQP void openOk(); friend class ClientPrivate; friend class ChannelPrivate; - Q_PRIVATE_SLOT(pd_func(), void _q_method(const QAMQP::Frame::Method & frame)) - Q_PRIVATE_SLOT(pd_func(), void _q_heartbeat()) + + void _q_method(const QAMQP::Frame::Method & frame); + Q_PRIVATE_SLOT(pd_func(), void _q_heartbeat()); }; } @@ -55,4 +56,4 @@ namespace QAMQP # include "amqp_connection_p.h" #endif -#endif // amqp_connection_h__ \ No newline at end of file +#endif // amqp_connection_h__ diff --git a/src/qamqp/amqp_exchange.h b/src/qamqp/amqp_exchange.h index 96ec25f..eb5a0f2 100644 --- a/src/qamqp/amqp_exchange.h +++ b/src/qamqp/amqp_exchange.h @@ -61,4 +61,4 @@ namespace QAMQP }; } Q_DECLARE_OPERATORS_FOR_FLAGS(QAMQP::Exchange::ExchangeOptions) -#endif // amqp_exchange_h__ \ No newline at end of file +#endif // amqp_exchange_h__ diff --git a/src/qamqp/amqp_frame.cpp b/src/qamqp/amqp_frame.cpp index 884a750..7e6b93c 100644 --- a/src/qamqp/amqp_frame.cpp +++ b/src/qamqp/amqp_frame.cpp @@ -1,5 +1,4 @@ #include "amqp_frame.h" -#define AMQP_FRAME_END 0xCE #include #include @@ -47,7 +46,7 @@ void QAMQP::Frame::Base::writeHeader( QDataStream & stream ) const void QAMQP::Frame::Base::writeEnd( QDataStream & stream ) const { - stream << qint8(AMQP_FRAME_END); + stream << qint8(FRAME_END); } void QAMQP::Frame::Base::writePayload( QDataStream & ) const{} @@ -64,7 +63,7 @@ void QAMQP::Frame::Base::readEnd( QDataStream & stream ) { unsigned char end_ = 0; stream.readRawData(reinterpret_cast(&end_), sizeof(end_)); - if(end_ != AMQP_FRAME_END ) + if(end_ != FRAME_END ) { qWarning("Wrong end of frame"); } @@ -711,6 +710,6 @@ qint32 QAMQP::Frame::ContentBody::size() const QAMQP::Frame::Heartbeat::Heartbeat() : Base(ftHeartbeat) {} -void QAMQP::Frame::Heartbeat::readPayload(QDataStream & stream) {} -void QAMQP::Frame::Heartbeat::writePayload(QDataStream & stream) const {} +void QAMQP::Frame::Heartbeat::readPayload(QDataStream & ) {} +void QAMQP::Frame::Heartbeat::writePayload(QDataStream & ) const {} diff --git a/src/qamqp/amqp_frame.h b/src/qamqp/amqp_frame.h index e813b46..8324d23 100644 --- a/src/qamqp/amqp_frame.h +++ b/src/qamqp/amqp_frame.h @@ -33,6 +33,21 @@ namespace QAMQP */ namespace Frame { + typedef quint16 channel_t; + /*! + @brief Header size in bytes + */ + static const qint64 HEADER_SIZE = 7; + /*! + @brief Frame end indicator size in bytes + */ + static const qint64 FRAME_END_SIZE = 1; + + /*! + @brief Frame end marker + */ + static const quint8 FRAME_END = 0xCE; + /*! @brief Frame type */ @@ -85,9 +100,9 @@ namespace QAMQP All frames start with a 7-octet header composed of a type field (octet), a channel field (short integer) and a size field (long integer): @code Frame struct - 0 1 3 7 size+7 size+8 + 0 1 3 7 size+7 size+8 +------+---------+---------+ +-------------+ +-----------+ - | type | channel | size | | payload | | frame-end | + | type | channel | size | | payload | | frame-end | +------+---------+---------+ +-------------+ +-----------+ @endcode octet short long 'size' octets octet @@ -167,11 +182,11 @@ namespace QAMQP Method frame bodies consist of an invariant list of data fields, called "arguments". All method bodies start with identifier numbers for the class and method: @code Frame struct - 0 2 4 + 0 2 4 +----------+-----------+-------------- - - | class-id | method-id | arguments... +----------+-----------+-------------- - - - short short ... + short short ... @endcode */ class Method : public Base @@ -239,7 +254,7 @@ namespace QAMQP +----------+--------+-----------+----------------+------------- - - | class-id | weight | body size | property flags | property list... +----------+--------+-----------+----------------+------------- - - - short short long long short remainder... + short short long long short remainder... @endcode | Property | Description | @@ -379,6 +394,24 @@ namespace QAMQP void writePayload(QDataStream & stream) const; void readPayload(QDataStream & stream); }; + + class MethodHandler + { + public: + virtual void _q_method(const QAMQP::Frame::Method & frame) = 0; + }; + + class ContentHandler + { + public: + virtual void _q_content(const QAMQP::Frame::Content & frame) = 0; + }; + + class ContentBodyHandler + { + public: + virtual void _q_body(const QAMQP::Frame::ContentBody & frame) = 0; + }; } } diff --git a/src/qamqp/amqp_global.h b/src/qamqp/amqp_global.h index 89c3fbc..0bc9e3c 100644 --- a/src/qamqp/amqp_global.h +++ b/src/qamqp/amqp_global.h @@ -13,7 +13,7 @@ #define AMQPPSWD "guest" #define FRAME_MAX 131072 -#define QAMQP_VERSION "0.1.2" +#define QAMQP_VERSION "0.2.0" diff --git a/src/qamqp/amqp_network.cpp b/src/qamqp/amqp_network.cpp index ec8ad27..cc3bea8 100644 --- a/src/qamqp/amqp_network.cpp +++ b/src/qamqp/amqp_network.cpp @@ -1,20 +1,16 @@ #include "amqp_network.h" #include #include +#include QAMQP::Network::Network( QObject * parent /*= 0*/ ):QObject(parent) { qRegisterMetaType("QAMQP::Frame::Method"); - - buffer_ = new QBuffer(this); - offsetBuf = 0; - leftSize = 0; + buffer_.reserve(Frame::HEADER_SIZE); timeOut_ = 1000; connect_ = false; - buffer_->open(QIODevice::ReadWrite); - initSocket(false); } @@ -97,54 +93,59 @@ void QAMQP::Network::error( QAbstractSocket::SocketError socketError ) void QAMQP::Network::readyRead() { - QDataStream streamA(socket_); - QDataStream streamB(buffer_); - - - while(!socket_->atEnd()) + while(socket_->bytesAvailable() >= Frame::HEADER_SIZE) { - if(leftSize == 0) + char* headerData = buffer_.data(); + socket_->peek(headerData, Frame::HEADER_SIZE); + const quint32 payloadSize = qFromBigEndian(*(quint32*)&headerData[3]); + const qint64 readSize = Frame::HEADER_SIZE+payloadSize+Frame::FRAME_END_SIZE; + if(socket_->bytesAvailable() >= readSize) { - lastType_ = 0; - qint16 channel_ = 0; - leftSize = 0; - offsetBuf = 0; + buffer_.resize(readSize); + socket_->read(buffer_.data(), readSize); + const char* bufferData = buffer_.constData(); + const quint8 type = *(quint8*)&bufferData[0]; + const quint8 magic = *(quint8*)&bufferData[Frame::HEADER_SIZE+payloadSize]; + if(magic != QAMQP::Frame::FRAME_END) + { + qWarning() << "Wrong end frame"; + } - streamA >> lastType_; - streamB << lastType_; - streamA >> channel_; - streamB << channel_; - streamA >> leftSize; - streamB << leftSize; - leftSize++; - } - - QByteArray data_; - data_.resize(leftSize); - offsetBuf = streamA.readRawData(data_.data(), data_.size()); - leftSize -= offsetBuf; - streamB.writeRawData(data_.data(), offsetBuf); - if(leftSize == 0) - { - buffer_->reset(); - switch(QAMQP::Frame::Type(lastType_)) + QDataStream streamB(&buffer_, QIODevice::ReadOnly); + switch(QAMQP::Frame::Type(type)) { case QAMQP::Frame::ftMethod: { QAMQP::Frame::Method frame(streamB); - emit method(frame); + if(frame.methodClass() == QAMQP::Frame::fcConnection) + { + m_pMethodHandlerConnection->_q_method(frame); + } + else + { + foreach(Frame::MethodHandler* pMethodHandler, m_methodHandlersByChannel[frame.channel()]) + { + pMethodHandler->_q_method(frame); + } + } } break; case QAMQP::Frame::ftHeader: { QAMQP::Frame::Content frame(streamB); - emit content(frame); + foreach(Frame::ContentHandler* pMethodHandler, m_contentHandlerByChannel[frame.channel()]) + { + pMethodHandler->_q_content(frame); + } } break; case QAMQP::Frame::ftBody: { QAMQP::Frame::ContentBody frame(streamB); - emit body(frame.channel(), frame.body()); + foreach(Frame::ContentBodyHandler* pMethodHandler, m_bodyHandlersByChannel[frame.channel()]) + { + pMethodHandler->_q_body(frame); + } } break; case QAMQP::Frame::ftHeartbeat: @@ -153,9 +154,12 @@ void QAMQP::Network::readyRead() } break; default: - qWarning("AMQP: Unknown frame type"); + qWarning() << "AMQP: Unknown frame type: " << type; } - buffer_->reset(); + } + else + { + break; } } } @@ -194,8 +198,7 @@ void QAMQP::Network::initSocket( bool ssl /*= false*/ ) socket_ = new QSslSocket(this); QSslSocket * ssl_= static_cast (socket_.data()); ssl_->setProtocol(QSsl::AnyProtocol); - connect(socket_, SIGNAL(sslErrors(const QList &)), - this, SLOT(sslErrors())); + connect(socket_, SIGNAL(sslErrors(const QList &)), this, SLOT(sslErrors())); connect(socket_, SIGNAL(connected()), this, SLOT(conectionReady())); #else @@ -251,3 +254,23 @@ QAbstractSocket::SocketState QAMQP::Network::state() const } } + +void QAMQP::Network::setMethodHandlerConnection(Frame::MethodHandler* pMethodHandlerConnection) +{ + m_pMethodHandlerConnection = pMethodHandlerConnection; +} + +void QAMQP::Network::addMethodHandlerForChannel(Channel channel, Frame::MethodHandler* pHandler) +{ + m_methodHandlersByChannel[channel].append(pHandler); +} + +void QAMQP::Network::addContentHandlerForChannel(Channel channel, Frame::ContentHandler* pHandler) +{ + m_contentHandlerByChannel[channel].append(pHandler); +} + +void QAMQP::Network::addContentBodyHandlerForChannel(Channel channel, Frame::ContentBodyHandler* pHandler) +{ + m_bodyHandlersByChannel[channel].append(pHandler); +} diff --git a/src/qamqp/amqp_network.h b/src/qamqp/amqp_network.h index b34a63f..9dd6f41 100644 --- a/src/qamqp/amqp_network.h +++ b/src/qamqp/amqp_network.h @@ -18,6 +18,8 @@ namespace QAMQP Q_OBJECT Q_DISABLE_COPY(Network) public: + typedef qint16 Channel; + Network(QObject * parent = 0); ~Network(); @@ -34,6 +36,11 @@ namespace QAMQP QAbstractSocket::SocketState state() const; + void setMethodHandlerConnection(Frame::MethodHandler* pMethodHandlerConnection); + void addMethodHandlerForChannel(Channel channel, Frame::MethodHandler* pHandler); + void addContentHandlerForChannel(Channel channel, Frame::ContentHandler* pHandler); + void addContentBodyHandlerForChannel(Channel channel, Frame::ContentBodyHandler* pHandler); + public slots: void connectTo(const QString & host = QString(), quint32 port = 0); void error( QAbstractSocket::SocketError socketError ); @@ -41,9 +48,6 @@ namespace QAMQP signals: void connected(); void disconnected(); - void method(const QAMQP::Frame::Method & method); - void content(const QAMQP::Frame::Content & content); - void body(int channeNumber, const QByteArray & body); private slots: void readyRead(); @@ -56,16 +60,18 @@ namespace QAMQP private: void initSocket(bool ssl = false); QPointer socket_; - QPointer buffer_; + QByteArray buffer_; QString lastHost_; int lastPort_; - int offsetBuf; - int leftSize; - qint8 lastType_; bool autoReconnect_; int timeOut_; - bool connect_; + + Frame::MethodHandler* m_pMethodHandlerConnection; + + QHash > m_methodHandlersByChannel; + QHash > m_contentHandlerByChannel; + QHash > m_bodyHandlersByChannel; }; } #endif // amqp_network_h__ diff --git a/src/qamqp/amqp_queue.cpp b/src/qamqp/amqp_queue.cpp index 2d24b01..e093ef3 100644 --- a/src/qamqp/amqp_queue.cpp +++ b/src/qamqp/amqp_queue.cpp @@ -54,10 +54,10 @@ void Queue::onOpen() } if(!d->delayedBindings.isEmpty()) { - QMap::iterator i; - for(i = d->delayedBindings.begin(); i!= d->delayedBindings.end(); ++i ) + typedef QPair BindingPair; + foreach(BindingPair binding, d->delayedBindings) { - d->bind(i.value(), i.key()); + d->bind(binding.first, binding.second); } d->delayedBindings.clear(); } @@ -129,6 +129,15 @@ void Queue::unbind( Exchange * exchange, const QString & key ) pd_func()->unbind(exchange->name(), key); } +void Queue::_q_content(const Content &frame) +{ + pd_func()->_q_content(frame); +} + +void Queue::_q_body(const ContentBody &frame) +{ + pd_func()->_q_body(frame); +} QAMQP::MessagePtr Queue::getMessage() { @@ -365,7 +374,7 @@ void QueuePrivate::bind( const QString & exchangeName, const QString & key ) { if(!opened) { - delayedBindings[exchangeName] = key; + delayedBindings.append(QPair(exchangeName, key)); return; } @@ -530,6 +539,7 @@ void QueuePrivate::deliver( const QAMQP::Frame::Method & frame ) void QueuePrivate::_q_content( const QAMQP::Frame::Content & frame ) { + Q_ASSERT(frame.channel() == number); if(frame.channel() != number) return; if(messages_.isEmpty()) @@ -537,7 +547,7 @@ void QueuePrivate::_q_content( const QAMQP::Frame::Content & frame ) qErrnoWarning("Received content-header without method frame before"); return; } - MessagePtr &message = messages_.head(); + MessagePtr &message = messages_.last(); message->leftSize = frame.bodySize(); QHash::ConstIterator i; for (i = frame.properties_.begin(); i != frame.properties_.end(); ++i) @@ -546,9 +556,10 @@ void QueuePrivate::_q_content( const QAMQP::Frame::Content & frame ) } } -void QueuePrivate::_q_body( int channeNumber, const QByteArray & body ) +void QueuePrivate::_q_body(const QAMQP::Frame::ContentBody & frame) { - if(channeNumber!= number) + Q_ASSERT(frame.channel() == number); + if(frame.channel() != number) return; if(messages_.isEmpty()) @@ -556,12 +567,12 @@ void QueuePrivate::_q_body( int channeNumber, const QByteArray & body ) qErrnoWarning("Received content-body without method frame before"); return; } - MessagePtr &message = messages_.head(); - message->payload.append(body); - message->leftSize -= body.size(); + MessagePtr &message = messages_.last(); + message->payload.append(frame.body()); + message->leftSize -= frame.body().size(); if(message->leftSize == 0 && messages_.size() == 1) { - QMetaObject::invokeMethod(pq_func(), "messageReceived"); + emit pq_func()->messageReceived(pq_func()); } } diff --git a/src/qamqp/amqp_queue.h b/src/qamqp/amqp_queue.h index de8a3a1..baeab12 100644 --- a/src/qamqp/amqp_queue.h +++ b/src/qamqp/amqp_queue.h @@ -10,7 +10,7 @@ namespace QAMQP class ClientPrivate; class Exchange; class QueuePrivate; - class Queue : public Channel + class Queue : public Channel, public Frame::ContentHandler, public Frame::ContentBodyHandler { Q_OBJECT Queue(int channelNumber = -1, Client * parent = 0); @@ -77,12 +77,12 @@ namespace QAMQP void declared(); void binded(bool); void removed(); - void messageReceived(); + void messageReceived(QAMQP::Queue* pQueue); void empty(); private: - Q_PRIVATE_SLOT(pd_func(), void _q_content(const QAMQP::Frame::Content & frame)) - Q_PRIVATE_SLOT(pd_func(), void _q_body(int channeNumber, const QByteArray & body)) + void _q_content(const QAMQP::Frame::Content & frame); + void _q_body(const QAMQP::Frame::ContentBody & frame); }; } #ifdef QAMQP_P_INCLUDE diff --git a/src/qamqp/amqp_queue_p.h b/src/qamqp/amqp_queue_p.h index 73daa53..79f9321 100644 --- a/src/qamqp/amqp_queue_p.h +++ b/src/qamqp/amqp_queue_p.h @@ -58,13 +58,13 @@ namespace QAMQP bool noAck; QString consumerTag; - QMap delayedBindings; + QQueue > delayedBindings; QQueue messages_; bool recievingMessage; void _q_content(const QAMQP::Frame::Content & frame); - void _q_body(int channeNumber, const QByteArray & body); + void _q_body(const QAMQP::Frame::ContentBody & frame); }; } -#endif // amqp_queue_p_h__ \ No newline at end of file +#endif // amqp_queue_p_h__