[+] Отправка/получение сообщений произвольного размера.

This commit is contained in:
fuCtor 2012-02-19 00:55:29 -08:00
parent 19a0496712
commit 9874e0af26
8 changed files with 116 additions and 12 deletions

View File

@ -104,6 +104,13 @@ void Exchange::publish( const QString & message, const QString & key )
d_func()->publish(message.toUtf8(), key); d_func()->publish(message.toUtf8(), key);
} }
void Exchange::publish( const QByteArray & message, const QString & key, const QString &mimeType )
{
d_func()->publish(message, key, mimeType);
}
////////////////////////////////////////////////////////////////////////// //////////////////////////////////////////////////////////////////////////
@ -221,15 +228,20 @@ void ExchangePrivate::publish( const QByteArray & message, const QString & key,
QAMQP::Frame::Content content(QAMQP::Frame::fcBasic); QAMQP::Frame::Content content(QAMQP::Frame::fcBasic);
content.setChannel(number); content.setChannel(number);
content.setProperty(Content::cpContentType, "text/plain"); content.setProperty(Content::cpContentType, mimeType);
content.setProperty(Content::cpContentEncoding, "utf-8"); content.setProperty(Content::cpContentEncoding, "utf-8");
content.setProperty(Content::cpMessageId, "0"); content.setProperty(Content::cpMessageId, "0");
content.setBody(message); content.setBody(message);
sendFrame(content); sendFrame(content);
int fullSize = message.size();
for (int sended_ = 0; sended_ < fullSize; sended_+= (FRAME_MAX - 7))
{
QAMQP::Frame::ContentBody body; QAMQP::Frame::ContentBody body;
QByteArray partition_ = message.mid(sended_, (FRAME_MAX - 7));
body.setChannel(number); body.setChannel(number);
body.setBody(message); body.setBody(partition_);
sendFrame(body); sendFrame(body);
} }
}

View File

@ -615,6 +615,11 @@ qlonglong QAMQP::Frame::Content::bodySize() const
ContentBody::ContentBody() : Base(ftBody) ContentBody::ContentBody() : Base(ftBody)
{} {}
QAMQP::Frame::ContentBody::ContentBody( QDataStream& raw ): Base(raw)
{
readPayload(raw);
}
void QAMQP::Frame::ContentBody::setBody( const QByteArray & data ) void QAMQP::Frame::ContentBody::setBody( const QByteArray & data )
{ {
body_ = data; body_ = data;
@ -632,7 +637,8 @@ void QAMQP::Frame::ContentBody::writePayload( QDataStream & out ) const
void QAMQP::Frame::ContentBody::readPayload( QDataStream & in ) void QAMQP::Frame::ContentBody::readPayload( QDataStream & in )
{ {
body_.resize(size_);
in.readRawData(body_.data(), body_.size());
} }
qint32 QAMQP::Frame::ContentBody::size() const qint32 QAMQP::Frame::ContentBody::size() const

View File

@ -22,6 +22,7 @@
namespace QAMQP namespace QAMQP
{ {
class QueuePrivate;
namespace Frame namespace Frame
{ {
enum Type enum Type
@ -109,6 +110,7 @@ namespace QAMQP
class Content : public Base class Content : public Base
{ {
friend class QAMQP::QueuePrivate;
public: public:
enum Property enum Property
@ -158,6 +160,7 @@ namespace QAMQP
{ {
public: public:
ContentBody(); ContentBody();
ContentBody(QDataStream& raw);
void setBody(const QByteArray & data); void setBody(const QByteArray & data);
QByteArray body() const; QByteArray body() const;
qint32 size() const; qint32 size() const;

View File

@ -0,0 +1,26 @@
#include "amqp_frame.h"
#include <QByteArray>
#include <QHash>
#include <QSharedPointer>
namespace QAMQP
{
struct Message
{
Message()
{
leftSize = 0;
}
typedef QAMQP::Frame::Content::Property MessageProperty;
Q_DECLARE_FLAGS(MessageProperties, MessageProperty);
QByteArray payload;
QHash<MessageProperty, QVariant> property;
QAMQP::Frame::TableField headers;
QString routeKey;
QString exchangeName;
int leftSize;
};
typedef QSharedPointer<QAMQP::Message> MessagePtr;
}

View File

@ -101,6 +101,10 @@ void QAMQP::Network::readyRead()
} }
break; break;
case QAMQP::Frame::ftBody: case QAMQP::Frame::ftBody:
{
QAMQP::Frame::ContentBody frame(streamB);
emit body(frame.channel(), frame.body());
}
break; break;
default: default:
qWarning("Unknown frame type"); qWarning("Unknown frame type");

View File

@ -8,6 +8,7 @@ using namespace QAMQP::Frame;
#include <QCoreApplication> #include <QCoreApplication>
#include <QDebug> #include <QDebug>
#include <QDataStream> #include <QDataStream>
#include <QFile>
namespace QAMQP namespace QAMQP
{ {
@ -120,9 +121,21 @@ void Queue::unbind( Exchange * exchange, const QString & key )
d_func()->unbind(exchange->name(), key); d_func()->unbind(exchange->name(), key);
} }
void Queue::get()
QAMQP::MessagePtr Queue::getMessage()
{
return d_func()->messages_.dequeue();
}
bool Queue::hasMessage() const
{ {
if(d_func()->messages_.isEmpty())
{
return false;
}
const MessagePtr &q = d_func()->messages_.head();
return q->leftSize == 0;
} }
void Queue::consume(ConsumeOptions options) void Queue::consume(ConsumeOptions options)
@ -418,17 +431,53 @@ void QueuePrivate::deliver( const QAMQP::Frame::Method & frame )
qDebug() << "| Redelivered: " << redelivered; qDebug() << "| Redelivered: " << redelivered;
qDebug("| Exchange-name: %s", qPrintable(exchangeName)); qDebug("| Exchange-name: %s", qPrintable(exchangeName));
qDebug("| Routing-key: %s", qPrintable(routingKey)); qDebug("| Routing-key: %s", qPrintable(routingKey));
MessagePtr newMessage = MessagePtr(new Message);
newMessage->routeKey = routingKey;
newMessage->exchangeName = exchangeName;
messages_.enqueue(newMessage);
} }
void QueuePrivate::_q_content( const QAMQP::Frame::Content & frame ) void QueuePrivate::_q_content( const QAMQP::Frame::Content & frame )
{ {
if(frame.channel() != number) if(frame.channel() != number)
return; return;
QFile::remove("dump.jpg");
qDebug() << "Content-type: " << qPrintable(frame.property(Content::cpContentType).toString()); qDebug() << "Content-type: " << qPrintable(frame.property(Content::cpContentType).toString());
qDebug() << "Encoding-type: " << qPrintable(frame.property(Content::cpContentEncoding).toString()); qDebug() << "Encoding-type: " << qPrintable(frame.property(Content::cpContentEncoding).toString());
if(messages_.isEmpty())
{
qErrnoWarning("Received content-header without method frame before");
return;
}
MessagePtr &message = messages_.head();
message->leftSize = frame.bodySize();
QHash<int, QVariant>::ConstIterator i;
for (i = frame.properties_.begin(); i != frame.properties_.end(); ++i)
{
message->property[Message::MessageProperty(i.key())]= i.value();
}
} }
void QueuePrivate::_q_body( int channeNumber, const QByteArray & body ) void QueuePrivate::_q_body( int channeNumber, const QByteArray & body )
{ {
if(channeNumber!= number)
return;
if(messages_.isEmpty())
{
qErrnoWarning("Received content-body without method frame before");
return;
}
MessagePtr &message = messages_.head();
message->payload.append(body);
message->leftSize -= body.size();
int size = message->leftSize;
if(message->leftSize == 0 && messages_.size() == 1)
{
QMetaObject::invokeMethod(q_func(), "messageRecieved");
}
} }

View File

@ -2,6 +2,7 @@
#define amqp_queue_h__ #define amqp_queue_h__
#include "amqp_channel.h" #include "amqp_channel.h"
#include "amqp_message.h"
namespace QAMQP namespace QAMQP
{ {
@ -60,7 +61,8 @@ namespace QAMQP
void unbind(const QString & exchangeName, const QString & key); void unbind(const QString & exchangeName, const QString & key);
void unbind(Exchange * exchange, const QString & key); void unbind(Exchange * exchange, const QString & key);
void get(); MessagePtr getMessage();
bool hasMessage() const;
void consume(ConsumeOptions options = ConsumeOptions(NoOptions)); void consume(ConsumeOptions options = ConsumeOptions(NoOptions));
void setConsumerTag(const QString &consumerTag); void setConsumerTag(const QString &consumerTag);
QString consumerTag() const; QString consumerTag() const;
@ -69,6 +71,8 @@ namespace QAMQP
void declared(); void declared();
void binded(bool); void binded(bool);
void removed(); void removed();
void messageRecieved();
private: private:
Q_PRIVATE_SLOT(d_func(), void _q_content(const QAMQP::Frame::Content & frame)) Q_PRIVATE_SLOT(d_func(), void _q_content(const QAMQP::Frame::Content & frame))
Q_PRIVATE_SLOT(d_func(), void _q_body(int channeNumber, const QByteArray & body)) Q_PRIVATE_SLOT(d_func(), void _q_body(int channeNumber, const QByteArray & body))

View File

@ -3,6 +3,7 @@
#include "amqp_channel_p.h" #include "amqp_channel_p.h"
#define METHOD_ID_ENUM(name, id) name = id, name ## Ok #define METHOD_ID_ENUM(name, id) name = id, name ## Ok
#include <QQueue>
namespace QAMQP namespace QAMQP
{ {
@ -54,13 +55,12 @@ namespace QAMQP
QString consumerTag; QString consumerTag;
QMap<QString, QString> delayedBindings; QMap<QString, QString> delayedBindings;
QQueue<QAMQP::MessagePtr> messages_;
bool recievingMessage; bool recievingMessage;
void _q_content(const QAMQP::Frame::Content & frame); void _q_content(const QAMQP::Frame::Content & frame);
void _q_body(int channeNumber, const QByteArray & body); void _q_body(int channeNumber, const QByteArray & body);
}; };
} }
#endif // amqp_queue_p_h__ #endif // amqp_queue_p_h__