From c3150322c1328c0dda0f3f112e43ae92c68dc4d0 Mon Sep 17 00:00:00 2001 From: fuCtor Date: Sat, 18 Feb 2012 03:40:24 -0800 Subject: [PATCH] --- src/qamqp/amqp.cpp | 6 ++ src/qamqp/amqp_channel.cpp | 10 +++ src/qamqp/amqp_channel.h | 4 +- src/qamqp/amqp_channel_p.h | 4 +- src/qamqp/amqp_connection.cpp | 26 +++++- src/qamqp/amqp_connection.h | 4 + src/qamqp/amqp_connection_p.h | 3 + src/qamqp/amqp_exchange.cpp | 10 +-- src/qamqp/amqp_frame.cpp | 71 +++++++++++++++- src/qamqp/amqp_frame.h | 3 + src/qamqp/amqp_network.cpp | 8 ++ src/qamqp/amqp_network.h | 2 + src/qamqp/amqp_queue.cpp | 152 +++++++++++++++++++++++++++------- src/qamqp/amqp_queue.h | 17 +++- src/qamqp/amqp_queue_p.h | 23 ++++- 15 files changed, 292 insertions(+), 51 deletions(-) diff --git a/src/qamqp/amqp.cpp b/src/qamqp/amqp.cpp index 7969b1f..4e6c1a1 100644 --- a/src/qamqp/amqp.cpp +++ b/src/qamqp/amqp.cpp @@ -123,6 +123,12 @@ Queue * ClientPrivate::createQueue(int channelNumber, const QString &name ) QObject::connect(network_, SIGNAL(method(const QAMQP::Frame::Method &)), queue_, SLOT(_q_method(const QAMQP::Frame::Method &))); + QObject::connect(network_, SIGNAL(content(const QAMQP::Frame::Content &)), + queue_, SLOT(_q_content(const QAMQP::Frame::Content &))); + + QObject::connect(network_, SIGNAL(body(int, const QByteArray &)), + queue_, SLOT(_q_body(int, const QByteArray &))); + QObject::connect(connection_, SIGNAL(connected()), queue_, SLOT(_q_open())); queue_->setName(name); return queue_; diff --git a/src/qamqp/amqp_channel.cpp b/src/qamqp/amqp_channel.cpp index 099e7d2..175794b 100644 --- a/src/qamqp/amqp_channel.cpp +++ b/src/qamqp/amqp_channel.cpp @@ -128,6 +128,11 @@ void QAMQP::Channel::onClose() { } + +void QAMQP::Channel::setQOS( qint32 prefetchSize, quint16 prefetchCount ) +{ + d_func()->setQOS(prefetchSize, prefetchCount); +} ////////////////////////////////////////////////////////////////////////// ChannelPrivate::ChannelPrivate(int version /* = QObjectPrivateVersion */) @@ -290,4 +295,9 @@ void ChannelPrivate::openOk( const QAMQP::Frame::Method & frame ) q->stateChanged(csOpened); q->onOpen(); +} + +void ChannelPrivate::setQOS( qint32 prefetchSize, quint16 prefetchCount ) +{ + client_->d_func()->connection_->d_func()->setQOS(prefetchSize, prefetchCount, number, false); } \ No newline at end of file diff --git a/src/qamqp/amqp_channel.h b/src/qamqp/amqp_channel.h index e76e60f..5edf5cd 100644 --- a/src/qamqp/amqp_channel.h +++ b/src/qamqp/amqp_channel.h @@ -14,7 +14,7 @@ namespace QAMQP Q_OBJECT Q_PROPERTY(int number READ channelNumber); - Q_PROPERTY(QString name READ name WRITE setName); + Q_PROPERTY(QString name READ name WRITE setName); Q_DECLARE_PRIVATE(QAMQP::Channel) Q_DISABLE_COPY(Channel) @@ -29,7 +29,7 @@ namespace QAMQP void setParam(int param); void setName(const QString &name); - + void setQOS(qint32 prefetchSize, quint16 prefetchCount); bool isOpened() const; signals: diff --git a/src/qamqp/amqp_channel_p.h b/src/qamqp/amqp_channel_p.h index e4bf2e6..95317e2 100644 --- a/src/qamqp/amqp_channel_p.h +++ b/src/qamqp/amqp_channel_p.h @@ -64,8 +64,8 @@ namespace QAMQP virtual void _q_method(const QAMQP::Frame::Method & frame); void _q_open(); - - + + void setQOS(qint32 prefetchSize, quint16 prefetchCount); void sendFrame(const QAMQP::Frame::Base & frame); QPointer client_; diff --git a/src/qamqp/amqp_connection.cpp b/src/qamqp/amqp_connection.cpp index 1bed0a7..69ece86 100644 --- a/src/qamqp/amqp_connection.cpp +++ b/src/qamqp/amqp_connection.cpp @@ -4,6 +4,7 @@ #include "amqp_p.h" #include "amqp_frame.h" + #include #include #include @@ -216,6 +217,23 @@ void ConnectionPrivate::closeOk( const QAMQP::Frame::Method & ) QMetaObject::invokeMethod(q_func(), "disconnected"); } + +void ConnectionPrivate::setQOS( qint32 prefetchSize, quint16 prefetchCount, int channel, bool global ) +{ + QAMQP::Frame::Method frame(QAMQP::Frame::fcBasic, 10); + frame.setChannel(channel); + QByteArray arguments_; + QDataStream out(&arguments_, QIODevice::WriteOnly); + + out << prefetchSize; + out << prefetchCount; + out << qint8(global ? 1 : 0); + + frame.setArguments(arguments_); + client_->d_func()->network_->sendFrame(frame); +} + + void ConnectionPrivate::_q_method( const QAMQP::Frame::Method & frame ) { if(frame.methodClass() != QAMQP::Frame::fcConnection) @@ -332,4 +350,10 @@ void Connection::openOk() bool Connection::isConnected() const { return d_func()->connected; -} \ No newline at end of file +} + + +void Connection::setQOS( qint32 prefetchSize, quint16 prefetchCount ) +{ + d_func()->setQOS(prefetchSize, prefetchSize, 0, true); +} diff --git a/src/qamqp/amqp_connection.h b/src/qamqp/amqp_connection.h index 0c7bd9e..34bca6d 100644 --- a/src/qamqp/amqp_connection.h +++ b/src/qamqp/amqp_connection.h @@ -8,6 +8,7 @@ namespace QAMQP { class ConnectionPrivate; + class ChannelPrivate; class ClientPrivate; class Client; class Connection : public QObject @@ -28,6 +29,8 @@ namespace QAMQP bool isConnected() const; + void setQOS(qint32 prefetchSize, quint16 prefetchCount); + Q_SIGNALS: void disconnected(); void connected(); @@ -36,6 +39,7 @@ namespace QAMQP private: void openOk(); friend class ClientPrivate; + friend class ChannelPrivate; Q_PRIVATE_SLOT(d_func(), void _q_method(const QAMQP::Frame::Method & frame)) }; } diff --git a/src/qamqp/amqp_connection_p.h b/src/qamqp/amqp_connection_p.h index 65d1159..d93c219 100644 --- a/src/qamqp/amqp_connection_p.h +++ b/src/qamqp/amqp_connection_p.h @@ -39,6 +39,9 @@ namespace QAMQP void close(const QAMQP::Frame::Method & frame); void closeOk(const QAMQP::Frame::Method & frame); void _q_method(const QAMQP::Frame::Method & frame); + + void setQOS(qint32 prefetchSize, quint16 prefetchCount, int channel, bool global); + QPointer client_; bool closed_; bool connected; diff --git a/src/qamqp/amqp_exchange.cpp b/src/qamqp/amqp_exchange.cpp index 937823d..09c524f 100644 --- a/src/qamqp/amqp_exchange.cpp +++ b/src/qamqp/amqp_exchange.cpp @@ -143,16 +143,16 @@ void ExchangePrivate::_q_method( const QAMQP::Frame::Method & frame ) void ExchangePrivate::declareOk( const QAMQP::Frame::Method & frame ) { - qDebug() << "Declared exchange: " << name; - QMetaObject::invokeMethod(q_func(), "declared"); + qDebug() << "Declared exchange: " << name; declared = true; + QMetaObject::invokeMethod(q_func(), "declared"); } void ExchangePrivate::deleteOk( const QAMQP::Frame::Method & frame ) { - qDebug() << "Deleted exchange: " << name; - QMetaObject::invokeMethod(q_func(), "removed"); + qDebug() << "Deleted exchange: " << name; declared = false; + QMetaObject::invokeMethod(q_func(), "removed"); } void ExchangePrivate::declare( ) @@ -189,7 +189,7 @@ void ExchangePrivate::remove( bool ifUnused /*= true*/, bool noWait /*= true*/ ) QByteArray arguments_; QDataStream stream(&arguments_, QIODevice::WriteOnly); - stream << qint8(0); //reserver 1 + stream << qint16(0); //reserver 1 writeField('s', stream, name); qint8 flag = 0; diff --git a/src/qamqp/amqp_frame.cpp b/src/qamqp/amqp_frame.cpp index 170d462..6fbd38b 100644 --- a/src/qamqp/amqp_frame.cpp +++ b/src/qamqp/amqp_frame.cpp @@ -185,12 +185,26 @@ QVariant QAMQP::Frame::readField( qint8 valueType, QDataStream &s ) value = QVariant::fromValue(*reinterpret_cast(octet4)); break; case 'L': + { + qlonglong v = 0 ; + s >> v; + value = v; + } + /* s.readRawData(octet8, sizeof(octet8)); - value = QVariant::fromValue(*reinterpret_cast(octet8)); + value = QVariant::fromValue(*reinterpret_cast(octet8));*/ + break; case 'l': + { + qulonglong v = 0 ; + s >> v; + value = v; + } + /* s.readRawData(octet8, sizeof(octet8)); - value = QVariant::fromValue(*reinterpret_cast(octet8)); + value = QVariant::fromValue(*reinterpret_cast(octet8));*/ + break; case 'f': s.readRawData(octet4, sizeof(octet4)); @@ -478,9 +492,9 @@ QAMQP::Frame::Content::Content( MethodClass methodClass ):Base(ftHeader) methodClass_ = methodClass; } -QAMQP::Frame::Content::Content( QDataStream& raw ):Base(ftHeader) +QAMQP::Frame::Content::Content( QDataStream& raw ): Base(raw) { - + readPayload(raw); } QAMQP::Frame::MethodClass QAMQP::Frame::Content::methodClass() const @@ -544,9 +558,58 @@ void QAMQP::Frame::Content::writePayload( QDataStream & out ) const void QAMQP::Frame::Content::readPayload( QDataStream & in ) { + in >> methodClass_; + in.skipRawData(2); //weight + in >> bodySize_; + qint16 flags_ = 0; + in >> flags_; + if(flags_ & cpContentType) + properties_[cpContentType] = readField('s', in); + if(flags_ & cpContentEncoding) + properties_[cpContentEncoding] = readField('s', in); + + if(flags_ & cpHeaders) + properties_[cpHeaders] = readField('f', in); + + if(flags_ & cpDeliveryMode) + properties_[cpDeliveryMode] = readField('b', in); + + if(flags_ & cpPriority) + properties_[cpPriority] = readField('b', in); + + if(flags_ & cpCorrelationId) + properties_[cpCorrelationId] = readField('s', in); + + if(flags_ & cpReplyTo) + properties_[cpReplyTo] = readField('s', in); + + if(flags_ & cpExpiration) + properties_[cpExpiration] = readField('s', in); + + if(flags_ & cpMessageId) + properties_[cpMessageId] = readField('s', in); + + if(flags_ & cpTimestamp) + properties_[cpTimestamp] = readField('T', in); + + if(flags_ & cpType) + properties_[cpType] = readField('s', in); + + if(flags_ & cpUserId) + properties_[cpUserId] = readField('s', in); + + if(flags_ & cpAppId) + properties_[cpAppId] = readField('s', in); + + if(flags_ & cpClusterID) + properties_[cpClusterID] = readField('s', in); } +qlonglong QAMQP::Frame::Content::bodySize() const +{ + return body_.isEmpty() ? bodySize_ : body_.size(); +} ////////////////////////////////////////////////////////////////////////// ContentBody::ContentBody() : Base(ftBody) diff --git a/src/qamqp/amqp_frame.h b/src/qamqp/amqp_frame.h index 6846fe0..cb195d9 100644 --- a/src/qamqp/amqp_frame.h +++ b/src/qamqp/amqp_frame.h @@ -128,6 +128,7 @@ namespace QAMQP cpAppId = AMQP_BASIC_APP_ID_FLAG, cpClusterID = AMQP_BASIC_CLUSTER_ID_FLAG }; + Q_DECLARE_FLAGS(Properties, Property) Content(); Content(MethodClass methodClass); @@ -140,6 +141,7 @@ namespace QAMQP void setBody(const QByteArray & data); QByteArray body() const; + qlonglong bodySize() const; protected: void writePayload(QDataStream & stream) const; @@ -149,6 +151,7 @@ namespace QAMQP QByteArray body_; mutable QByteArray buffer_; QHash properties_; + qlonglong bodySize_; }; class ContentBody : public Base diff --git a/src/qamqp/amqp_network.cpp b/src/qamqp/amqp_network.cpp index 2a040bb..afb7389 100644 --- a/src/qamqp/amqp_network.cpp +++ b/src/qamqp/amqp_network.cpp @@ -94,6 +94,14 @@ void QAMQP::Network::readyRead() emit method(frame); } break; + case QAMQP::Frame::ftHeader: + { + QAMQP::Frame::Content frame(streamB); + emit content(frame); + } + break; + case QAMQP::Frame::ftBody: + break; default: qWarning("Unknown frame type"); } diff --git a/src/qamqp/amqp_network.h b/src/qamqp/amqp_network.h index dc2c96c..3b9498d 100644 --- a/src/qamqp/amqp_network.h +++ b/src/qamqp/amqp_network.h @@ -26,6 +26,8 @@ namespace QAMQP signals: void method(const QAMQP::Frame::Method & method); + void content(const QAMQP::Frame::Content & content); + void body(int channeNumber, const QByteArray & body); private slots: void connected(); diff --git a/src/qamqp/amqp_queue.cpp b/src/qamqp/amqp_queue.cpp index f5ccb3b..bf68534 100644 --- a/src/qamqp/amqp_queue.cpp +++ b/src/qamqp/amqp_queue.cpp @@ -125,14 +125,14 @@ void Queue::get() } -void Queue::consume() +void Queue::consume(QueueOptions options) { - + d_func()->consume(options); } void Queue::setConsumerTag( const QString &consumerTag ) { - d_func()->setConsumerTag(consumerTag); + d_func()->consumerTag = consumerTag; } QString Queue::consumerTag() const @@ -147,6 +147,7 @@ QueuePrivate::QueuePrivate() :ChannelPrivate() , deleyedDeclare(false) , declared(false) + , recievingMessage(false) { } @@ -160,36 +161,54 @@ QueuePrivate::~QueuePrivate() void QueuePrivate::_q_method( const QAMQP::Frame::Method & frame ) { ChannelPrivate::_q_method(frame); - if(frame.methodClass() != QAMQP::Frame::fcQueue - || frame.channel() != number ) + if(frame.channel() != number) return; - switch(frame.id()) + if(frame.methodClass() == QAMQP::Frame::fcQueue) { - case miDeclareOk: - declareOk(frame); - break; - case miDelete: - deleteOk(frame); - break; - case miBindOk: - bindOk(frame); - break; - case miUnbindOk: - unbindOk(frame); - break; - case miPurgeOk: - deleteOk(frame); - break; - default: - break; + switch(frame.id()) + { + case miDeclareOk: + declareOk(frame); + break; + case miDelete: + deleteOk(frame); + break; + case miBindOk: + bindOk(frame); + break; + case miUnbindOk: + unbindOk(frame); + break; + case miPurgeOk: + deleteOk(frame); + break; + default: + break; + } } + + if(frame.methodClass() == QAMQP::Frame::fcBasic) + { + switch(frame.id()) + { + case bmConsumeOk: + consumeOk(frame); + break; + case bmDeliver: + deliver(frame); + break; + default: + break; + } + } + + } void QueuePrivate::declareOk( const QAMQP::Frame::Method & frame ) { - qDebug() << "Declared queue: " << name; - QMetaObject::invokeMethod(q_func(), "declared"); + qDebug() << "Declared queue: " << name; declared = true; QByteArray data = frame.arguments(); @@ -199,12 +218,13 @@ void QueuePrivate::declareOk( const QAMQP::Frame::Method & frame ) qint32 messageCount = 0, consumerCount = 0; stream >> messageCount >> consumerCount; qDebug("Message count %d\nConsumer count: %d", messageCount, consumerCount); + + QMetaObject::invokeMethod(q_func(), "declared"); } void QueuePrivate::deleteOk( const QAMQP::Frame::Method & frame ) { - qDebug() << "Deleted or purged queue: " << name; - QMetaObject::invokeMethod(q_func(), "removed"); + qDebug() << "Deleted or purged queue: " << name; declared = false; QByteArray data = frame.arguments(); @@ -212,6 +232,7 @@ void QueuePrivate::deleteOk( const QAMQP::Frame::Method & frame ) qint32 messageCount = 0; stream >> messageCount; qDebug("Message count %d", messageCount); + QMetaObject::invokeMethod(q_func(), "removed"); } @@ -255,12 +276,12 @@ void QueuePrivate::remove( bool ifUnused /*= true*/, bool ifEmpty /*= true*/, bo if(!declared) return; - QAMQP::Frame::Method frame(QAMQP::Frame::fcExchange, miDelete); + QAMQP::Frame::Method frame(QAMQP::Frame::fcQueue, miDelete); frame.setChannel(number); QByteArray arguments_; QDataStream out(&arguments_, QIODevice::WriteOnly); - out << qint8(0); //reserver 1 + out << qint16(0); //reserver 1 writeField('s', out, name); qint8 flag = 0; @@ -282,13 +303,13 @@ void QueuePrivate::purge() return; } - QAMQP::Frame::Method frame(QAMQP::Frame::fcQueue, miBind); + QAMQP::Frame::Method frame(QAMQP::Frame::fcQueue, miPurge); frame.setChannel(number); QByteArray arguments_; QDataStream out(&arguments_, QIODevice::WriteOnly); out << qint16(0); //reserver 1 writeField('s', out, name); - out << qint8(1); // no-wait + out << qint8(0); // no-wait frame.setArguments(arguments_); sendFrame(frame); @@ -340,7 +361,74 @@ void QueuePrivate::unbind( const QString & exchangeName, const QString & key ) } -void QueuePrivate::setConsumerTag( const QString &consumerTag ) + +void QueuePrivate::consume( Queue::QueueOptions options ) +{ + if(!opened) + { + return; + } + + QAMQP::Frame::Method frame(QAMQP::Frame::fcBasic, bmConsume); + frame.setChannel(number); + QByteArray arguments_; + QDataStream out(&arguments_, QIODevice::WriteOnly); + out << qint16(0); //reserver 1 + writeField('s', out, name); + writeField('s', out, consumerTag); + out << qint8(options); // no-wait + writeField('F', out, TableField()); + + frame.setArguments(arguments_); + sendFrame(frame); +} + + +void QueuePrivate::consumeOk( const QAMQP::Frame::Method & frame ) +{ + qDebug() << "Consume ok: " << name; + declared = false; + + QByteArray data = frame.arguments(); + QDataStream stream(&data, QIODevice::ReadOnly); + consumerTag = readField('s',stream).toString(); + qDebug("Consumer tag = %s", qPrintable(consumerTag)); +} + + +void QueuePrivate::deliver( const QAMQP::Frame::Method & frame ) +{ + qDebug() << "* Receive message: "; + declared = false; + + QByteArray data = frame.arguments(); + QDataStream in(&data, QIODevice::ReadOnly); + QString consumer_ = readField('s',in).toString(); + if(consumer_ != consumerTag) + { + return; + } + + qlonglong deliveryTag = readField('L',in).toLongLong(); + bool redelivered = readField('t',in).toBool(); + QString exchangeName = readField('s',in).toString(); + QString routingKey = readField('s',in).toString(); + + qDebug() << "| Delivery-tag: " << deliveryTag; + qDebug() << "| Redelivered: " << redelivered; + qDebug("| Exchange-name: %s", qPrintable(exchangeName)); + qDebug("| Routing-key: %s", qPrintable(routingKey)); +} + +void QueuePrivate::_q_content( const QAMQP::Frame::Content & frame ) +{ + if(frame.channel() != number) + return; + qDebug() << "Content-type: " << qPrintable(frame.property(Content::cpContentType).toString()); + qDebug() << "Encoding-type: " << qPrintable(frame.property(Content::cpContentEncoding).toString()); +} + +void QueuePrivate::_q_body( int channeNumber, const QByteArray & body ) { } \ No newline at end of file diff --git a/src/qamqp/amqp_queue.h b/src/qamqp/amqp_queue.h index 53ce5f1..ebad3b0 100644 --- a/src/qamqp/amqp_queue.h +++ b/src/qamqp/amqp_queue.h @@ -35,6 +35,15 @@ namespace QAMQP NoWait = 0x10 }; Q_DECLARE_FLAGS(QueueOptions, QueueOption) + + enum ConsumeOption { + coNoLocal = 0x1, + coNoAck = 0x02, + coExclusive = 0x04, + coNoWait = 0x8 + }; + Q_DECLARE_FLAGS(ConsumeOptions, ConsumeOption) + ~Queue(); QueueOptions option() const; @@ -52,7 +61,7 @@ namespace QAMQP void unbind(Exchange * exchange, const QString & key); void get(); - void consume(); + void consume(ConsumeOptions options = NoOptions); void setConsumerTag(const QString &consumerTag); QString consumerTag() const; @@ -60,6 +69,12 @@ namespace QAMQP void declared(); void binded(bool); void removed(); + private: + Q_PRIVATE_SLOT(d_func(), void _q_content(const QAMQP::Frame::Content & frame)) + Q_PRIVATE_SLOT(d_func(), void _q_body(int channeNumber, const QByteArray & body)) }; } +#ifdef QAMQP_P_INCLUDE +# include "amqp_queue_p.h" +#endif #endif // amqp_queue_h__ \ No newline at end of file diff --git a/src/qamqp/amqp_queue_p.h b/src/qamqp/amqp_queue_p.h index 9139851..fac5ef9 100644 --- a/src/qamqp/amqp_queue_p.h +++ b/src/qamqp/amqp_queue_p.h @@ -1,3 +1,6 @@ +#ifndef amqp_queue_p_h__ +#define amqp_queue_p_h__ + #include "amqp_channel_p.h" #define METHOD_ID_ENUM(name, id) name = id, name ## Ok @@ -27,13 +30,19 @@ namespace QAMQP void bind(const QString & exchangeName, const QString & key); void unbind(const QString & exchangeName, const QString & key); - void setConsumerTag( const QString &consumerTag ); - void declareOk(const QAMQP::Frame::Method & frame); void deleteOk(const QAMQP::Frame::Method & frame); void bindOk(const QAMQP::Frame::Method & frame); void unbindOk(const QAMQP::Frame::Method & frame); + /************************************************************************/ + /* CLASS BASIC METHODS */ + /************************************************************************/ + + void consume(Queue::ConsumeOptions options); + void consumeOk(const QAMQP::Frame::Method & frame); + + void deliver(const QAMQP::Frame::Method & frame); QString type; Queue::QueueOptions options; @@ -45,7 +54,13 @@ namespace QAMQP QString consumerTag; QMap delayedBindings; + + + bool recievingMessage; + + void _q_content(const QAMQP::Frame::Content & frame); + void _q_body(int channeNumber, const QByteArray & body); }; - -} \ No newline at end of file +} +#endif // amqp_queue_p_h__ \ No newline at end of file