diff --git a/README.md b/README.md index ff2dfc1..31eba28 100644 --- a/README.md +++ b/README.md @@ -1,6 +1,6 @@ QAMQP ============= -Qt4 implementation of AMQP 0.9.1. +Qt4/Qt5 implementation of AMQP 0.9.1. Implement ------------ diff --git a/src/main.cpp b/src/main.cpp index a204a39..c6b9f99 100644 --- a/src/main.cpp +++ b/src/main.cpp @@ -2,7 +2,6 @@ #include "QamqpApp.h" - int main(int argc, char *argv[]) { QAMQP::samples::QamqpApp qamqpApp(argc, argv); diff --git a/src/qamqp/amqp_connection.cpp b/src/qamqp/amqp_connection.cpp index 987baea..7ac94fb 100644 --- a/src/qamqp/amqp_connection.cpp +++ b/src/qamqp/amqp_connection.cpp @@ -4,11 +4,11 @@ #include "amqp_p.h" #include "amqp_frame.h" - #include #include #include #include +#include using namespace QAMQP; @@ -53,6 +53,9 @@ void ConnectionPrivate::init(Client * parent) { pq_func()->setParent(parent); client_ = parent; + heartbeatTimer_ = new QTimer(parent); + QObject::connect(heartbeatTimer_, SIGNAL(timeout()), + pq_func(), SLOT(_q_heartbeat())); } void ConnectionPrivate::startOk() @@ -91,7 +94,7 @@ void ConnectionPrivate::tuneOk() stream << qint16(0); //channel_max stream << qint32(FRAME_MAX); //frame_max - stream << qint16(0); //heartbeat + stream << qint16(heartbeatTimer_->interval() / 1000); //heartbeat frame.setArguments(arguments_); client_->pd_func()->network_->sendFrame(frame); @@ -157,10 +160,21 @@ void ConnectionPrivate::tune( const QAMQP::Frame::Method & frame ) stream >> channel_max; stream >> frame_max; stream >> heartbeat; + qDebug(">> channel_max: %d", channel_max); qDebug(">> frame_max: %d", frame_max); qDebug(">> heartbeat: %d", heartbeat); + if(heartbeatTimer_) + { + heartbeatTimer_->setInterval(heartbeat * 1000); + if(heartbeatTimer_->interval()) + { + heartbeatTimer_->start() + } else { + heartbeatTimer_->stop() + } + } tuneOk(); open(); } @@ -215,12 +229,17 @@ void ConnectionPrivate::closeOk() QAMQP::Frame::Method frame(QAMQP::Frame::fcConnection, miCloseOk); connected = false; client_->pd_func()->network_->sendFrame(frame); + } void ConnectionPrivate::closeOk( const QAMQP::Frame::Method & ) { connected = false; QMetaObject::invokeMethod(pq_func(), "disconnected"); + if(heartbeatTimer_) + { + heartbeatTimer_->stop() + } } @@ -239,7 +258,6 @@ void ConnectionPrivate::setQOS( qint32 prefetchSize, quint16 prefetchCount, int client_->pd_func()->network_->sendFrame(frame); } - bool ConnectionPrivate::_q_method( const QAMQP::Frame::Method & frame ) { if(frame.methodClass() != QAMQP::Frame::fcConnection) @@ -281,6 +299,12 @@ bool ConnectionPrivate::_q_method( const QAMQP::Frame::Method & frame ) return true; } +void ConnectionPrivate::_q_heartbeat() +{ + QAMQP::Frame::Heartbeat frame; + client_->pd_func()->network_->sendFrame(frame); +} + ////////////////////////////////////////////////////////////////////////// Connection::Connection( Client * parent /*= 0*/ ) diff --git a/src/qamqp/amqp_connection.h b/src/qamqp/amqp_connection.h index 703f133..3ab0e6d 100644 --- a/src/qamqp/amqp_connection.h +++ b/src/qamqp/amqp_connection.h @@ -46,6 +46,7 @@ namespace QAMQP friend class ClientPrivate; friend class ChannelPrivate; Q_PRIVATE_SLOT(pd_func(), void _q_method(const QAMQP::Frame::Method & frame)) + Q_PRIVATE_SLOT(pd_func(), void _q_heartbeat()) }; } diff --git a/src/qamqp/amqp_connection_p.h b/src/qamqp/amqp_connection_p.h index 1fe0f74..cbf2032 100644 --- a/src/qamqp/amqp_connection_p.h +++ b/src/qamqp/amqp_connection_p.h @@ -5,6 +5,8 @@ #include +class QTimer; + namespace QAMQP { class Client; @@ -39,12 +41,14 @@ namespace QAMQP void close(const QAMQP::Frame::Method & frame); void closeOk(const QAMQP::Frame::Method & frame); bool _q_method(const QAMQP::Frame::Method & frame); + void _q_heartbeat(); void setQOS(qint32 prefetchSize, quint16 prefetchCount, int channel, bool global); QPointer client_; bool closed_; bool connected; + QPointer heartbeatTimer_; Connection * const pq_ptr; diff --git a/src/qamqp/amqp_frame.cpp b/src/qamqp/amqp_frame.cpp index fc02ce4..884a750 100644 --- a/src/qamqp/amqp_frame.cpp +++ b/src/qamqp/amqp_frame.cpp @@ -151,7 +151,7 @@ QVariant QAMQP::Frame::readField( qint8 valueType, QDataStream &s ) { QVariant value; QByteArray tmp; - qint8 nameSize_; + qint8 nameSize_ = 0; char octet = 0; switch(valueType) @@ -238,7 +238,11 @@ QVariant QAMQP::Frame::readField( qint8 valueType, QDataStream &s ) s >> nameSize_; tmp.resize(nameSize_); s.readRawData(tmp.data(), tmp.size()); + #if QT_VERSION < 0x050000 value = QString::fromAscii(tmp.data(), nameSize_); + #else // For Qt5 + value = QString::fromLatin1(tmp.data(), nameSize_); + #endif break; case 'S': { @@ -247,7 +251,11 @@ QVariant QAMQP::Frame::readField( qint8 valueType, QDataStream &s ) tmp.resize(length_); } s.readRawData(tmp.data(), tmp.size()); - value = QString::fromAscii(tmp.data(), tmp.size()); + #if QT_VERSION < 0x050000 + value = QString::fromAscii(tmp.data(), nameSize_); + #else // For Qt5 + value = QString::fromLatin1(tmp.data(), nameSize_); + #endif break; case 'A': { @@ -394,14 +402,22 @@ void QAMQP::Frame::writeField( qint8 valueType, QDataStream &s, const QVariant & { QString str = value.toString(); s << quint8(str.length()); + #if QT_VERSION < 0x050000 s.writeRawData(str.toAscii().data(), str.length()); + #else // For Qt5 + s.writeRawData(str.toLatin1().data(), str.length()); + #endif } break; case 'S': { QString str = value.toString(); s << quint32(str.length()); + #if QT_VERSION < 0x050000 s.writeRawData(str.toAscii().data(), str.length()); + #else // For Qt5 + s.writeRawData(str.toLatin1().data(), str.length()); + #endif } break; case 'A': @@ -690,3 +706,11 @@ qint32 QAMQP::Frame::ContentBody::size() const { return body_.size(); } + +////////////////////////////////////////////////////////////////////////// + +QAMQP::Frame::Heartbeat::Heartbeat() : Base(ftHeartbeat) {} + +void QAMQP::Frame::Heartbeat::readPayload(QDataStream & stream) {} +void QAMQP::Frame::Heartbeat::writePayload(QDataStream & stream) const {} + diff --git a/src/qamqp/amqp_frame.h b/src/qamqp/amqp_frame.h index bf861aa..e813b46 100644 --- a/src/qamqp/amqp_frame.h +++ b/src/qamqp/amqp_frame.h @@ -361,6 +361,24 @@ namespace QAMQP private: QByteArray body_; }; + + /*! + @brief Class for working with heartbeat frames. + @detailed Implement frame for heartbeat send. + */ + class Heartbeat : public Base + { + public: + /*! + Heartbeat class constructor. + @detailed Construct frame class for sending. + */ + Heartbeat(); + + protected: + void writePayload(QDataStream & stream) const; + void readPayload(QDataStream & stream); + }; } } diff --git a/src/qamqp/amqp_network.cpp b/src/qamqp/amqp_network.cpp index c3dcb74..07ec9dd 100644 --- a/src/qamqp/amqp_network.cpp +++ b/src/qamqp/amqp_network.cpp @@ -141,6 +141,11 @@ void QAMQP::Network::readyRead() emit body(frame.channel(), frame.body()); } break; + case QAMQP::Frame::ftHeartbeat: + { + qDebug("Heartbeat"); + } + break; default: qWarning("Unknown frame type"); } diff --git a/src/test.h b/src/test.h index 217cf99..177a69f 100644 --- a/src/test.h +++ b/src/test.h @@ -16,7 +16,7 @@ public: private slots: void declared(); - void newMessage(); + void newMessage(); private: QPointer client_;