diff --git a/src/amqp_exchange.cpp b/src/amqp_exchange.cpp index dc7b94c..3ed4cff 100644 --- a/src/amqp_exchange.cpp +++ b/src/amqp_exchange.cpp @@ -119,25 +119,59 @@ void Exchange::bind(const QString &queueName, const QString &key) qWarning("Not implemented"); } -void Exchange::publish(const QString &message, const QString &key, const MessageProperties &prop) +void Exchange::publish(const QString &message, const QString &key, + const MessageProperties &properties) { - Q_D(Exchange); - d->publish(message.toUtf8(), key, QLatin1String("text.plain"), QVariantHash(), prop); + publish(message.toUtf8(), key, QLatin1String("text.plain"), QVariantHash(), properties); } void Exchange::publish(const QByteArray &message, const QString &key, - const QString &mimeType, const MessageProperties &prop) + const QString &mimeType, const MessageProperties &properties) { - Q_D(Exchange); - d->publish(message, key, mimeType, QVariantHash(), prop); + publish(message, key, mimeType, QVariantHash(), properties); } void Exchange::publish(const QByteArray &message, const QString &key, - const QVariantHash &headers, const QString &mimeType, - const MessageProperties &prop) + const QString &mimeType, const QVariantHash &headers, + const Exchange::MessageProperties &properties) { Q_D(Exchange); - d->publish(message, key, mimeType, headers, prop); + Frame::Method frame(Frame::fcBasic, ExchangePrivate::bmPublish); + frame.setChannel(d->number); + + QByteArray arguments; + QDataStream out(&arguments, QIODevice::WriteOnly); + + out << qint16(0); //reserver 1 + Frame::writeField('s', out, d->name); + Frame::writeField('s', out, key); + out << qint8(0); + + frame.setArguments(arguments); + d->sendFrame(frame); + + Frame::Content content(Frame::fcBasic); + content.setChannel(d->number); + content.setProperty(Frame::Content::cpContentType, mimeType); + content.setProperty(Frame::Content::cpContentEncoding, "utf-8"); + content.setProperty(Frame::Content::cpHeaders, headers); + content.setProperty(Frame::Content::cpMessageId, "0"); + + Exchange::MessageProperties::ConstIterator it; + Exchange::MessageProperties::ConstIterator itEnd = properties.constEnd(); + for (it = properties.constBegin(); it != itEnd; ++it) + content.setProperty(it.key(), it.value()); + content.setBody(message); + d->sendFrame(content); + + int fullSize = message.size(); + for (int sent = 0; sent < fullSize; sent += (FRAME_MAX - 7)) { + Frame::ContentBody body; + QByteArray partition = message.mid(sent, (FRAME_MAX - 7)); + body.setChannel(d->number); + body.setBody(partition); + d->sendFrame(body); + } } ////////////////////////////////////////////////////////////////////////// @@ -149,10 +183,6 @@ ExchangePrivate::ExchangePrivate(Exchange *q) { } -ExchangePrivate::~ExchangePrivate() -{ -} - bool ExchangePrivate::_q_method(const Frame::Method &frame) { if (ChannelPrivate::_q_method(frame)) @@ -178,61 +208,21 @@ bool ExchangePrivate::_q_method(const Frame::Method &frame) void ExchangePrivate::declareOk(const Frame::Method &frame) { Q_UNUSED(frame) + Q_Q(Exchange); qDebug() << "Declared exchange: " << name; declared = true; - QMetaObject::invokeMethod(q, "declared"); + Q_EMIT q->declared(); } void ExchangePrivate::deleteOk(const Frame::Method &frame) { Q_UNUSED(frame) + Q_Q(Exchange); qDebug() << "Deleted exchange: " << name; declared = false; - QMetaObject::invokeMethod(q, "removed"); -} - -void ExchangePrivate::publish(const QByteArray &message, const QString &key, - const QString &mimeType, const QVariantHash &headers, - const Exchange::MessageProperties &prop) -{ - Frame::Method frame(Frame::fcBasic, bmPublish); - frame.setChannel(number); - QByteArray arguments_; - QDataStream out(&arguments_, QIODevice::WriteOnly); - - out << qint16(0); //reserver 1 - Frame::writeField('s', out, name); - Frame::writeField('s', out, key); - out << qint8(0); - - frame.setArguments(arguments_); - sendFrame(frame); - - Frame::Content content(Frame::fcBasic); - content.setChannel(number); - content.setProperty(Frame::Content::cpContentType, mimeType); - content.setProperty(Frame::Content::cpContentEncoding, "utf-8"); - content.setProperty(Frame::Content::cpHeaders, headers); - content.setProperty(Frame::Content::cpMessageId, "0"); - - Exchange::MessageProperties::ConstIterator it; - Exchange::MessageProperties::ConstIterator itEnd = prop.constEnd(); - for (it = prop.constBegin(); it != itEnd; ++it) - content.setProperty(it.key(), it.value()); - - content.setBody(message); - sendFrame(content); - - int fullSize = message.size(); - for (int sended_ = 0; sended_ < fullSize; sended_+= (FRAME_MAX - 7)) { - Frame::ContentBody body; - QByteArray partition_ = message.mid(sended_, (FRAME_MAX - 7)); - body.setChannel(number); - body.setBody(partition_); - sendFrame(body); - } + Q_EMIT q->removed(); } void ExchangePrivate::_q_disconnected() diff --git a/src/amqp_exchange.h b/src/amqp_exchange.h index 063f3cd..4636a03 100644 --- a/src/amqp_exchange.h +++ b/src/amqp_exchange.h @@ -45,11 +45,12 @@ public: void bind(const QString &queueName, const QString &key); void publish(const QString &message, const QString &key, - const MessageProperties &property = MessageProperties()); + const MessageProperties &properties = MessageProperties()); void publish(const QByteArray &message, const QString &key, - const QString &mimeType, const MessageProperties &property = MessageProperties()); - void publish(const QByteArray &message, const QString &key, const QVariantHash &headers, - const QString &mimeType, const MessageProperties &property = MessageProperties()); + const QString &mimeType, const MessageProperties &properties = MessageProperties()); + void publish(const QByteArray &message, const QString &key, + const QString &mimeType, const QVariantHash &headers, + const Exchange::MessageProperties &properties = Exchange::MessageProperties()); Q_SIGNALS: void declared(); diff --git a/src/amqp_exchange_p.h b/src/amqp_exchange_p.h index afea52d..f748040 100644 --- a/src/amqp_exchange_p.h +++ b/src/amqp_exchange_p.h @@ -15,12 +15,6 @@ public: }; ExchangePrivate(Exchange *q); - ~ExchangePrivate(); - - void publish(const QByteArray &message, const QString &key, - const QString &mimeType = QLatin1String("text/plain"), - const QVariantHash &headers = QVariantHash(), - const Exchange::MessageProperties &properties = Exchange::MessageProperties()); // method handler related virtual void _q_disconnected();