+ headers for Exchange::publish method

+ redeclare Exchange after reconnection

* fixed Frame::Content (de)serialize
This commit is contained in:
fuCtor 2012-05-24 01:50:36 -07:00
parent 71607a7635
commit fd64ee6a59
10 changed files with 88 additions and 26 deletions

View File

@ -130,6 +130,7 @@ Exchange * ClientPrivate::createExchange(int channelNumber, const QString &name
exchange_, SLOT(_q_method(const QAMQP::Frame::Method &))); exchange_, SLOT(_q_method(const QAMQP::Frame::Method &)));
QObject::connect(connection_, SIGNAL(connected()), exchange_, SLOT(_q_open())); QObject::connect(connection_, SIGNAL(connected()), exchange_, SLOT(_q_open()));
QObject::connect(connection_, SIGNAL(disconnected()), exchange_, SLOT(_q_disconnected()));
exchange_->setName(name); exchange_->setName(name);
return exchange_; return exchange_;
} }

View File

@ -193,7 +193,10 @@ void ChannelPrivate::_q_open()
void ChannelPrivate::sendFrame( const QAMQP::Frame::Base & frame ) void ChannelPrivate::sendFrame( const QAMQP::Frame::Base & frame )
{ {
client_->pd_func()->network_->sendFrame(frame); if(client_)
{
client_->pd_func()->network_->sendFrame(frame);
}
} }
@ -301,4 +304,11 @@ void ChannelPrivate::openOk( const QAMQP::Frame::Method & frame )
void ChannelPrivate::setQOS( qint32 prefetchSize, quint16 prefetchCount ) void ChannelPrivate::setQOS( qint32 prefetchSize, quint16 prefetchCount )
{ {
client_->pd_func()->connection_->pd_func()->setQOS(prefetchSize, prefetchCount, number, false); client_->pd_func()->connection_->pd_func()->setQOS(prefetchSize, prefetchCount, number, false);
} }
void ChannelPrivate::_q_disconnected()
{
nextChannelNumber_ = 0;
opened = false;
}

View File

@ -49,6 +49,7 @@ namespace QAMQP
friend class ClientPrivate; friend class ClientPrivate;
Q_PRIVATE_SLOT(pd_func(), void _q_open()) Q_PRIVATE_SLOT(pd_func(), void _q_open())
Q_PRIVATE_SLOT(pd_func(), void _q_method(const QAMQP::Frame::Method & frame)) Q_PRIVATE_SLOT(pd_func(), void _q_method(const QAMQP::Frame::Method & frame))
Q_PRIVATE_SLOT(pd_func(), void _q_disconnected())
}; };
} }

View File

@ -64,6 +64,7 @@ namespace QAMQP
void closeOk(const QAMQP::Frame::Method & frame); void closeOk(const QAMQP::Frame::Method & frame);
virtual bool _q_method(const QAMQP::Frame::Method & frame); virtual bool _q_method(const QAMQP::Frame::Method & frame);
virtual void _q_disconnected();
void _q_open(); void _q_open();
void setQOS(qint32 prefetchSize, quint16 prefetchCount); void setQOS(qint32 prefetchSize, quint16 prefetchCount);

View File

@ -59,9 +59,9 @@ void ConnectionPrivate::startOk()
QDataStream stream(&arguments_, QIODevice::WriteOnly); QDataStream stream(&arguments_, QIODevice::WriteOnly);
QAMQP::Frame::TableField clientProperties; QAMQP::Frame::TableField clientProperties;
clientProperties["version"] = "0.0.3"; clientProperties["version"] = QString("0.0.3");
clientProperties["platform"] = QString("Qt %1").arg(qVersion()); clientProperties["platform"] = QString("Qt %1").arg(qVersion());
clientProperties["product"] = "QAMQP"; clientProperties["product"] = QString("QAMQP");
QAMQP::Frame::serialize(stream, clientProperties); QAMQP::Frame::serialize(stream, clientProperties);
client_->pd_func()->auth_->write(stream); client_->pd_func()->auth_->write(stream);
@ -185,6 +185,7 @@ void ConnectionPrivate::close( const QAMQP::Frame::Method & frame )
qDebug(">> class-id: %d", classId); qDebug(">> class-id: %d", classId);
qDebug(">> method-id: %d", methodId); qDebug(">> method-id: %d", methodId);
connected = false; connected = false;
QMetaObject::invokeMethod(pq_func(), "disconnected");
} }
void ConnectionPrivate::close(int code, const QString & text, int classId, int methodId) void ConnectionPrivate::close(int code, const QString & text, int classId, int methodId)

View File

@ -26,6 +26,8 @@ namespace QAMQP
#endif #endif
} }
}; };
} }
Exchange::Exchange(int channelNumber, Client * parent /*= 0*/ ) Exchange::Exchange(int channelNumber, Client * parent /*= 0*/ )
@ -113,6 +115,10 @@ void Exchange::publish( const QByteArray & message, const QString & key, const Q
pd_func()->publish(message, key, mimeType); pd_func()->publish(message, key, mimeType);
} }
void Exchange::publish( const QByteArray & message, const QString & key, const QVariantHash &headers, const QString &mimeType )
{
pd_func()->publish(message, key, mimeType, headers);
}
////////////////////////////////////////////////////////////////////////// //////////////////////////////////////////////////////////////////////////
@ -215,7 +221,7 @@ void ExchangePrivate::remove( bool ifUnused /*= true*/, bool noWait /*= true*/ )
sendFrame(frame); sendFrame(frame);
} }
void ExchangePrivate::publish( const QByteArray & message, const QString & key, const QString &mimeType /*= QString::fromLatin1("text/plain")*/ ) void ExchangePrivate::publish( const QByteArray & message, const QString & key, const QString &mimeType /*= QString::fromLatin1("text/plain")*/, const QVariantHash & headers )
{ {
QAMQP::Frame::Method frame(QAMQP::Frame::fcBasic, bmPublish); QAMQP::Frame::Method frame(QAMQP::Frame::fcBasic, bmPublish);
frame.setChannel(number); frame.setChannel(number);
@ -235,7 +241,8 @@ void ExchangePrivate::publish( const QByteArray & message, const QString & key,
content.setChannel(number); content.setChannel(number);
content.setProperty(Content::cpContentType, mimeType); content.setProperty(Content::cpContentType, mimeType);
content.setProperty(Content::cpContentEncoding, "utf-8"); content.setProperty(Content::cpContentEncoding, "utf-8");
content.setProperty(Content::cpMessageId, "0"); content.setProperty(Content::cpHeaders, headers);
content.setProperty(Content::cpMessageId, "0");
content.setBody(message); content.setBody(message);
sendFrame(content); sendFrame(content);
@ -249,4 +256,13 @@ void ExchangePrivate::publish( const QByteArray & message, const QString & key,
sendFrame(body); sendFrame(body);
} }
} }
void ExchangePrivate::_q_disconnected()
{
ChannelPrivate::_q_disconnected();
qDebug() << "Exchange " << name << " disconnected";
deleyedDeclare = false;
declared = false;
}

View File

@ -51,6 +51,7 @@ namespace QAMQP
void publish(const QString & message, const QString & key); void publish(const QString & message, const QString & key);
void publish(const QByteArray & message, const QString & key, const QString &mimeType); void publish(const QByteArray & message, const QString & key, const QString &mimeType);
void publish(const QByteArray & message, const QString & key, const QVariantHash &headers, const QString &mimeType);
Q_SIGNALS: Q_SIGNALS:
void declared(); void declared();

View File

@ -24,13 +24,14 @@ namespace QAMQP
void declareOk(const QAMQP::Frame::Method & frame); void declareOk(const QAMQP::Frame::Method & frame);
void deleteOk(const QAMQP::Frame::Method & frame); void deleteOk(const QAMQP::Frame::Method & frame);
void publish(const QByteArray & message, const QString & key, const QString &mimeType = QString::fromLatin1("text/plain")); void publish(const QByteArray & message, const QString & key, const QString &mimeType = QString::fromLatin1("text/plain"), const QVariantHash & headers = QVariantHash());
QString type; QString type;
Exchange::ExchangeOptions options; Exchange::ExchangeOptions options;
TableField arguments; TableField arguments;
bool _q_method(const QAMQP::Frame::Method & frame); bool _q_method(const QAMQP::Frame::Method & frame);
void _q_disconnected();
bool deleyedDeclare; bool deleyedDeclare;
bool declared; bool declared;

View File

@ -423,6 +423,9 @@ void QAMQP::Frame::writeField( QDataStream &s, const QVariant & value )
case QVariant::Bool: case QVariant::Bool:
type = 't'; type = 't';
break; break;
case QVariant::ByteArray:
type = 'S';
break;
case QVariant::Int: case QVariant::Int:
{ {
int i = qAbs(value.toInt()); int i = qAbs(value.toInt());
@ -453,12 +456,7 @@ void QAMQP::Frame::writeField( QDataStream &s, const QVariant & value )
case QVariant::ULongLong: case QVariant::ULongLong:
type = 'l'; type = 'l';
break; break;
case QVariant::String: case QVariant::String:
/*
{
QString str = value.toString();
type = str.length() > 255 ? 'S' : 's';
}*/
type = 'S'; type = 'S';
break; break;
case QVariant::DateTime: case QVariant::DateTime:
@ -516,16 +514,48 @@ qint32 QAMQP::Frame::Content::size() const
} }
out << prop_; out << prop_;
QHash<int, QVariant>::const_iterator i;
for(i = properties_.begin(); i != properties_.end(); ++i) if(prop_ & cpContentType)
{ writeField('s', out, properties_[cpContentType]);
if(i.value().type() == QVariant::String)
{ if(prop_ & cpContentEncoding)
writeField('s', out, i.value()); writeField('s', out, properties_[cpContentEncoding]);
} else {
writeField(out, i.value()); if(prop_ & cpHeaders)
} writeField('F', out, properties_[cpHeaders]);
}
if(prop_ & cpDeliveryMode)
writeField('b', out, properties_[cpDeliveryMode]);
if(prop_ & cpPriority)
writeField('b', out, properties_[cpPriority]);
if(prop_ & cpCorrelationId)
writeField('s', out, properties_[cpCorrelationId]);
if(prop_ & cpReplyTo)
writeField('s', out, properties_[cpReplyTo]);
if(prop_ & cpExpiration)
writeField('s', out, properties_[cpExpiration]);
if(prop_ & cpMessageId)
writeField('s', out, properties_[cpMessageId]);
if(prop_ & cpTimestamp)
writeField('T', out, properties_[cpTimestamp]);
if(prop_ & cpType)
writeField('s', out, properties_[cpType]);
if(prop_ & cpUserId)
writeField('s', out, properties_[cpUserId]);
if(prop_ & cpAppId)
writeField('s', out, properties_[cpAppId]);
if(prop_ & cpClusterID)
writeField('s', out, properties_[cpClusterID]);
return buffer_.size(); return buffer_.size();
} }
@ -569,7 +599,7 @@ void QAMQP::Frame::Content::readPayload( QDataStream & in )
properties_[cpContentEncoding] = readField('s', in); properties_[cpContentEncoding] = readField('s', in);
if(flags_ & cpHeaders) if(flags_ & cpHeaders)
properties_[cpHeaders] = readField('f', in); properties_[cpHeaders] = readField('F', in);
if(flags_ & cpDeliveryMode) if(flags_ & cpDeliveryMode)
properties_[cpDeliveryMode] = readField('b', in); properties_[cpDeliveryMode] = readField('b', in);

View File

@ -146,7 +146,7 @@ void QAMQP::Network::readyRead()
void QAMQP::Network::sendFrame( const QAMQP::Frame::Base & frame ) void QAMQP::Network::sendFrame( const QAMQP::Frame::Base & frame )
{ {
if(socket_->state() == QAbstractSocket::Connected) if(socket_->state() == QAbstractSocket::ConnectedState)
{ {
QDataStream stream(socket_); QDataStream stream(socket_);
frame.toStream(stream); frame.toStream(stream);