diff --git a/src/qamqp/amqp.cpp b/src/qamqp/amqp.cpp index 752e12a..392a60d 100644 --- a/src/qamqp/amqp.cpp +++ b/src/qamqp/amqp.cpp @@ -130,6 +130,7 @@ Exchange * ClientPrivate::createExchange(int channelNumber, const QString &name exchange_, SLOT(_q_method(const QAMQP::Frame::Method &))); QObject::connect(connection_, SIGNAL(connected()), exchange_, SLOT(_q_open())); + QObject::connect(connection_, SIGNAL(disconnected()), exchange_, SLOT(_q_disconnected())); exchange_->setName(name); return exchange_; } diff --git a/src/qamqp/amqp_channel.cpp b/src/qamqp/amqp_channel.cpp index 5470928..0697db6 100644 --- a/src/qamqp/amqp_channel.cpp +++ b/src/qamqp/amqp_channel.cpp @@ -193,7 +193,10 @@ void ChannelPrivate::_q_open() void ChannelPrivate::sendFrame( const QAMQP::Frame::Base & frame ) { - client_->pd_func()->network_->sendFrame(frame); + if(client_) + { + client_->pd_func()->network_->sendFrame(frame); + } } @@ -301,4 +304,11 @@ void ChannelPrivate::openOk( const QAMQP::Frame::Method & frame ) void ChannelPrivate::setQOS( qint32 prefetchSize, quint16 prefetchCount ) { client_->pd_func()->connection_->pd_func()->setQOS(prefetchSize, prefetchCount, number, false); -} \ No newline at end of file +} + + +void ChannelPrivate::_q_disconnected() +{ + nextChannelNumber_ = 0; + opened = false; +} diff --git a/src/qamqp/amqp_channel.h b/src/qamqp/amqp_channel.h index 3d378f7..ac7aaf6 100644 --- a/src/qamqp/amqp_channel.h +++ b/src/qamqp/amqp_channel.h @@ -49,6 +49,7 @@ namespace QAMQP friend class ClientPrivate; Q_PRIVATE_SLOT(pd_func(), void _q_open()) Q_PRIVATE_SLOT(pd_func(), void _q_method(const QAMQP::Frame::Method & frame)) + Q_PRIVATE_SLOT(pd_func(), void _q_disconnected()) }; } diff --git a/src/qamqp/amqp_channel_p.h b/src/qamqp/amqp_channel_p.h index 473f851..0450fdf 100644 --- a/src/qamqp/amqp_channel_p.h +++ b/src/qamqp/amqp_channel_p.h @@ -64,6 +64,7 @@ namespace QAMQP void closeOk(const QAMQP::Frame::Method & frame); virtual bool _q_method(const QAMQP::Frame::Method & frame); + virtual void _q_disconnected(); void _q_open(); void setQOS(qint32 prefetchSize, quint16 prefetchCount); diff --git a/src/qamqp/amqp_connection.cpp b/src/qamqp/amqp_connection.cpp index d6bbe9a..a224d83 100644 --- a/src/qamqp/amqp_connection.cpp +++ b/src/qamqp/amqp_connection.cpp @@ -59,9 +59,9 @@ void ConnectionPrivate::startOk() QDataStream stream(&arguments_, QIODevice::WriteOnly); QAMQP::Frame::TableField clientProperties; - clientProperties["version"] = "0.0.3"; + clientProperties["version"] = QString("0.0.3"); clientProperties["platform"] = QString("Qt %1").arg(qVersion()); - clientProperties["product"] = "QAMQP"; + clientProperties["product"] = QString("QAMQP"); QAMQP::Frame::serialize(stream, clientProperties); client_->pd_func()->auth_->write(stream); @@ -185,6 +185,7 @@ void ConnectionPrivate::close( const QAMQP::Frame::Method & frame ) qDebug(">> class-id: %d", classId); qDebug(">> method-id: %d", methodId); connected = false; + QMetaObject::invokeMethod(pq_func(), "disconnected"); } void ConnectionPrivate::close(int code, const QString & text, int classId, int methodId) diff --git a/src/qamqp/amqp_exchange.cpp b/src/qamqp/amqp_exchange.cpp index fee987f..0401de5 100644 --- a/src/qamqp/amqp_exchange.cpp +++ b/src/qamqp/amqp_exchange.cpp @@ -26,6 +26,8 @@ namespace QAMQP #endif } }; + + } Exchange::Exchange(int channelNumber, Client * parent /*= 0*/ ) @@ -113,6 +115,10 @@ void Exchange::publish( const QByteArray & message, const QString & key, const Q pd_func()->publish(message, key, mimeType); } +void Exchange::publish( const QByteArray & message, const QString & key, const QVariantHash &headers, const QString &mimeType ) +{ + pd_func()->publish(message, key, mimeType, headers); +} ////////////////////////////////////////////////////////////////////////// @@ -215,7 +221,7 @@ void ExchangePrivate::remove( bool ifUnused /*= true*/, bool noWait /*= true*/ ) sendFrame(frame); } -void ExchangePrivate::publish( const QByteArray & message, const QString & key, const QString &mimeType /*= QString::fromLatin1("text/plain")*/ ) +void ExchangePrivate::publish( const QByteArray & message, const QString & key, const QString &mimeType /*= QString::fromLatin1("text/plain")*/, const QVariantHash & headers ) { QAMQP::Frame::Method frame(QAMQP::Frame::fcBasic, bmPublish); frame.setChannel(number); @@ -235,7 +241,8 @@ void ExchangePrivate::publish( const QByteArray & message, const QString & key, content.setChannel(number); content.setProperty(Content::cpContentType, mimeType); content.setProperty(Content::cpContentEncoding, "utf-8"); - content.setProperty(Content::cpMessageId, "0"); + content.setProperty(Content::cpHeaders, headers); + content.setProperty(Content::cpMessageId, "0"); content.setBody(message); sendFrame(content); @@ -249,4 +256,13 @@ void ExchangePrivate::publish( const QByteArray & message, const QString & key, sendFrame(body); } -} \ No newline at end of file +} + + +void ExchangePrivate::_q_disconnected() +{ + ChannelPrivate::_q_disconnected(); + qDebug() << "Exchange " << name << " disconnected"; + deleyedDeclare = false; + declared = false; +} diff --git a/src/qamqp/amqp_exchange.h b/src/qamqp/amqp_exchange.h index a86b92b..f8318f4 100644 --- a/src/qamqp/amqp_exchange.h +++ b/src/qamqp/amqp_exchange.h @@ -51,6 +51,7 @@ namespace QAMQP void publish(const QString & message, const QString & key); void publish(const QByteArray & message, const QString & key, const QString &mimeType); + void publish(const QByteArray & message, const QString & key, const QVariantHash &headers, const QString &mimeType); Q_SIGNALS: void declared(); diff --git a/src/qamqp/amqp_exchange_p.h b/src/qamqp/amqp_exchange_p.h index 41e78b9..3d5d9cd 100644 --- a/src/qamqp/amqp_exchange_p.h +++ b/src/qamqp/amqp_exchange_p.h @@ -24,13 +24,14 @@ namespace QAMQP void declareOk(const QAMQP::Frame::Method & frame); void deleteOk(const QAMQP::Frame::Method & frame); - void publish(const QByteArray & message, const QString & key, const QString &mimeType = QString::fromLatin1("text/plain")); + void publish(const QByteArray & message, const QString & key, const QString &mimeType = QString::fromLatin1("text/plain"), const QVariantHash & headers = QVariantHash()); QString type; Exchange::ExchangeOptions options; TableField arguments; bool _q_method(const QAMQP::Frame::Method & frame); + void _q_disconnected(); bool deleyedDeclare; bool declared; diff --git a/src/qamqp/amqp_frame.cpp b/src/qamqp/amqp_frame.cpp index 98459c9..1601b73 100644 --- a/src/qamqp/amqp_frame.cpp +++ b/src/qamqp/amqp_frame.cpp @@ -423,6 +423,9 @@ void QAMQP::Frame::writeField( QDataStream &s, const QVariant & value ) case QVariant::Bool: type = 't'; break; + case QVariant::ByteArray: + type = 'S'; + break; case QVariant::Int: { int i = qAbs(value.toInt()); @@ -453,12 +456,7 @@ void QAMQP::Frame::writeField( QDataStream &s, const QVariant & value ) case QVariant::ULongLong: type = 'l'; break; - case QVariant::String: - /* - { - QString str = value.toString(); - type = str.length() > 255 ? 'S' : 's'; - }*/ + case QVariant::String: type = 'S'; break; case QVariant::DateTime: @@ -516,16 +514,48 @@ qint32 QAMQP::Frame::Content::size() const } out << prop_; - QHash::const_iterator i; - for(i = properties_.begin(); i != properties_.end(); ++i) - { - if(i.value().type() == QVariant::String) - { - writeField('s', out, i.value()); - } else { - writeField(out, i.value()); - } - } + + if(prop_ & cpContentType) + writeField('s', out, properties_[cpContentType]); + + if(prop_ & cpContentEncoding) + writeField('s', out, properties_[cpContentEncoding]); + + if(prop_ & cpHeaders) + writeField('F', out, properties_[cpHeaders]); + + if(prop_ & cpDeliveryMode) + writeField('b', out, properties_[cpDeliveryMode]); + + if(prop_ & cpPriority) + writeField('b', out, properties_[cpPriority]); + + if(prop_ & cpCorrelationId) + writeField('s', out, properties_[cpCorrelationId]); + + if(prop_ & cpReplyTo) + writeField('s', out, properties_[cpReplyTo]); + + if(prop_ & cpExpiration) + writeField('s', out, properties_[cpExpiration]); + + if(prop_ & cpMessageId) + writeField('s', out, properties_[cpMessageId]); + + if(prop_ & cpTimestamp) + writeField('T', out, properties_[cpTimestamp]); + + if(prop_ & cpType) + writeField('s', out, properties_[cpType]); + + if(prop_ & cpUserId) + writeField('s', out, properties_[cpUserId]); + + if(prop_ & cpAppId) + writeField('s', out, properties_[cpAppId]); + + if(prop_ & cpClusterID) + writeField('s', out, properties_[cpClusterID]); return buffer_.size(); } @@ -569,7 +599,7 @@ void QAMQP::Frame::Content::readPayload( QDataStream & in ) properties_[cpContentEncoding] = readField('s', in); if(flags_ & cpHeaders) - properties_[cpHeaders] = readField('f', in); + properties_[cpHeaders] = readField('F', in); if(flags_ & cpDeliveryMode) properties_[cpDeliveryMode] = readField('b', in); diff --git a/src/qamqp/amqp_network.cpp b/src/qamqp/amqp_network.cpp index 08202ed..a34c500 100644 --- a/src/qamqp/amqp_network.cpp +++ b/src/qamqp/amqp_network.cpp @@ -146,7 +146,7 @@ void QAMQP::Network::readyRead() void QAMQP::Network::sendFrame( const QAMQP::Frame::Base & frame ) { - if(socket_->state() == QAbstractSocket::Connected) + if(socket_->state() == QAbstractSocket::ConnectedState) { QDataStream stream(socket_); frame.toStream(stream);