diff --git a/src/qamqp/amqp_exchange.cpp b/src/qamqp/amqp_exchange.cpp index 09c524f..e0e713e 100644 --- a/src/qamqp/amqp_exchange.cpp +++ b/src/qamqp/amqp_exchange.cpp @@ -104,6 +104,13 @@ void Exchange::publish( const QString & message, const QString & key ) d_func()->publish(message.toUtf8(), key); } + +void Exchange::publish( const QByteArray & message, const QString & key, const QString &mimeType ) +{ + d_func()->publish(message, key, mimeType); +} + + ////////////////////////////////////////////////////////////////////////// @@ -221,15 +228,20 @@ void ExchangePrivate::publish( const QByteArray & message, const QString & key, QAMQP::Frame::Content content(QAMQP::Frame::fcBasic); content.setChannel(number); - content.setProperty(Content::cpContentType, "text/plain"); + content.setProperty(Content::cpContentType, mimeType); content.setProperty(Content::cpContentEncoding, "utf-8"); content.setProperty(Content::cpMessageId, "0"); content.setBody(message); sendFrame(content); - - QAMQP::Frame::ContentBody body; - body.setChannel(number); - body.setBody(message); - sendFrame(body); + int fullSize = message.size(); + for (int sended_ = 0; sended_ < fullSize; sended_+= (FRAME_MAX - 7)) + { + QAMQP::Frame::ContentBody body; + QByteArray partition_ = message.mid(sended_, (FRAME_MAX - 7)); + body.setChannel(number); + body.setBody(partition_); + sendFrame(body); + } + } \ No newline at end of file diff --git a/src/qamqp/amqp_frame.cpp b/src/qamqp/amqp_frame.cpp index 6fbd38b..2b48f17 100644 --- a/src/qamqp/amqp_frame.cpp +++ b/src/qamqp/amqp_frame.cpp @@ -615,6 +615,11 @@ qlonglong QAMQP::Frame::Content::bodySize() const ContentBody::ContentBody() : Base(ftBody) {} +QAMQP::Frame::ContentBody::ContentBody( QDataStream& raw ): Base(raw) +{ + readPayload(raw); +} + void QAMQP::Frame::ContentBody::setBody( const QByteArray & data ) { body_ = data; @@ -632,7 +637,8 @@ void QAMQP::Frame::ContentBody::writePayload( QDataStream & out ) const void QAMQP::Frame::ContentBody::readPayload( QDataStream & in ) { - + body_.resize(size_); + in.readRawData(body_.data(), body_.size()); } qint32 QAMQP::Frame::ContentBody::size() const diff --git a/src/qamqp/amqp_frame.h b/src/qamqp/amqp_frame.h index cb195d9..97b7abe 100644 --- a/src/qamqp/amqp_frame.h +++ b/src/qamqp/amqp_frame.h @@ -22,6 +22,7 @@ namespace QAMQP { + class QueuePrivate; namespace Frame { enum Type @@ -109,6 +110,7 @@ namespace QAMQP class Content : public Base { + friend class QAMQP::QueuePrivate; public: enum Property @@ -158,6 +160,7 @@ namespace QAMQP { public: ContentBody(); + ContentBody(QDataStream& raw); void setBody(const QByteArray & data); QByteArray body() const; qint32 size() const; diff --git a/src/qamqp/amqp_message.h b/src/qamqp/amqp_message.h index e69de29..640078a 100644 --- a/src/qamqp/amqp_message.h +++ b/src/qamqp/amqp_message.h @@ -0,0 +1,26 @@ +#include "amqp_frame.h" +#include +#include +#include + +namespace QAMQP +{ + struct Message + { + Message() + { + leftSize = 0; + } + typedef QAMQP::Frame::Content::Property MessageProperty; + Q_DECLARE_FLAGS(MessageProperties, MessageProperty); + + QByteArray payload; + QHash property; + QAMQP::Frame::TableField headers; + QString routeKey; + QString exchangeName; + int leftSize; + }; + + typedef QSharedPointer MessagePtr; +} \ No newline at end of file diff --git a/src/qamqp/amqp_network.cpp b/src/qamqp/amqp_network.cpp index afb7389..7927432 100644 --- a/src/qamqp/amqp_network.cpp +++ b/src/qamqp/amqp_network.cpp @@ -101,6 +101,10 @@ void QAMQP::Network::readyRead() } break; case QAMQP::Frame::ftBody: + { + QAMQP::Frame::ContentBody frame(streamB); + emit body(frame.channel(), frame.body()); + } break; default: qWarning("Unknown frame type"); diff --git a/src/qamqp/amqp_queue.cpp b/src/qamqp/amqp_queue.cpp index e6c9000..ef0aaf9 100644 --- a/src/qamqp/amqp_queue.cpp +++ b/src/qamqp/amqp_queue.cpp @@ -8,6 +8,7 @@ using namespace QAMQP::Frame; #include #include #include +#include namespace QAMQP { @@ -120,9 +121,21 @@ void Queue::unbind( Exchange * exchange, const QString & key ) d_func()->unbind(exchange->name(), key); } -void Queue::get() + +QAMQP::MessagePtr Queue::getMessage() +{ + return d_func()->messages_.dequeue(); +} + +bool Queue::hasMessage() const { + if(d_func()->messages_.isEmpty()) + { + return false; + } + const MessagePtr &q = d_func()->messages_.head(); + return q->leftSize == 0; } void Queue::consume(ConsumeOptions options) @@ -154,7 +167,7 @@ QueuePrivate::QueuePrivate() QueuePrivate::~QueuePrivate() { - + } @@ -418,17 +431,53 @@ void QueuePrivate::deliver( const QAMQP::Frame::Method & frame ) qDebug() << "| Redelivered: " << redelivered; qDebug("| Exchange-name: %s", qPrintable(exchangeName)); qDebug("| Routing-key: %s", qPrintable(routingKey)); + + + MessagePtr newMessage = MessagePtr(new Message); + newMessage->routeKey = routingKey; + newMessage->exchangeName = exchangeName; + messages_.enqueue(newMessage); + } void QueuePrivate::_q_content( const QAMQP::Frame::Content & frame ) { if(frame.channel() != number) return; + QFile::remove("dump.jpg"); qDebug() << "Content-type: " << qPrintable(frame.property(Content::cpContentType).toString()); qDebug() << "Encoding-type: " << qPrintable(frame.property(Content::cpContentEncoding).toString()); + if(messages_.isEmpty()) + { + qErrnoWarning("Received content-header without method frame before"); + return; + } + MessagePtr &message = messages_.head(); + message->leftSize = frame.bodySize(); + QHash::ConstIterator i; + for (i = frame.properties_.begin(); i != frame.properties_.end(); ++i) + { + message->property[Message::MessageProperty(i.key())]= i.value(); + } } void QueuePrivate::_q_body( int channeNumber, const QByteArray & body ) { + if(channeNumber!= number) + return; + if(messages_.isEmpty()) + { + qErrnoWarning("Received content-body without method frame before"); + return; + } + MessagePtr &message = messages_.head(); + message->payload.append(body); + message->leftSize -= body.size(); + int size = message->leftSize; + + if(message->leftSize == 0 && messages_.size() == 1) + { + QMetaObject::invokeMethod(q_func(), "messageRecieved"); + } } \ No newline at end of file diff --git a/src/qamqp/amqp_queue.h b/src/qamqp/amqp_queue.h index 9d0b259..a537a59 100644 --- a/src/qamqp/amqp_queue.h +++ b/src/qamqp/amqp_queue.h @@ -2,6 +2,7 @@ #define amqp_queue_h__ #include "amqp_channel.h" +#include "amqp_message.h" namespace QAMQP { @@ -60,7 +61,8 @@ namespace QAMQP void unbind(const QString & exchangeName, const QString & key); void unbind(Exchange * exchange, const QString & key); - void get(); + MessagePtr getMessage(); + bool hasMessage() const; void consume(ConsumeOptions options = ConsumeOptions(NoOptions)); void setConsumerTag(const QString &consumerTag); QString consumerTag() const; @@ -69,6 +71,8 @@ namespace QAMQP void declared(); void binded(bool); void removed(); + void messageRecieved(); + private: Q_PRIVATE_SLOT(d_func(), void _q_content(const QAMQP::Frame::Content & frame)) Q_PRIVATE_SLOT(d_func(), void _q_body(int channeNumber, const QByteArray & body)) diff --git a/src/qamqp/amqp_queue_p.h b/src/qamqp/amqp_queue_p.h index fac5ef9..6648e4b 100644 --- a/src/qamqp/amqp_queue_p.h +++ b/src/qamqp/amqp_queue_p.h @@ -3,6 +3,7 @@ #include "amqp_channel_p.h" #define METHOD_ID_ENUM(name, id) name = id, name ## Ok +#include namespace QAMQP { @@ -54,13 +55,12 @@ namespace QAMQP QString consumerTag; QMap delayedBindings; - + QQueue messages_; bool recievingMessage; void _q_content(const QAMQP::Frame::Content & frame); void _q_body(int channeNumber, const QByteArray & body); }; - } #endif // amqp_queue_p_h__ \ No newline at end of file