From 9699491ef16b18e27c46c67bc0de3b18d307e9c9 Mon Sep 17 00:00:00 2001 From: Alexey Shcherbakov Date: Sat, 23 Feb 2013 13:20:01 +0600 Subject: [PATCH] add heartbeat --- src/main.cpp | 8 ++++---- src/qamqp/amqp_connection.cpp | 20 +++++++++++++++++--- src/qamqp/amqp_connection.h | 1 + src/qamqp/amqp_connection_p.h | 4 ++++ src/qamqp/amqp_frame.cpp | 8 ++++++++ src/qamqp/amqp_frame.h | 18 ++++++++++++++++++ src/qamqp/amqp_network.cpp | 5 +++++ src/test.cpp | 2 +- src/test.h | 2 +- 9 files changed, 59 insertions(+), 9 deletions(-) diff --git a/src/main.cpp b/src/main.cpp index a072bc9..963f238 100644 --- a/src/main.cpp +++ b/src/main.cpp @@ -32,16 +32,16 @@ void myMessageOutput(QtMsgType type, const QMessageLogContext &context, const QS QByteArray localMsg = msg.toLocal8Bit(); switch (type) { case QtDebugMsg: - fprintf(stderr, "#: %s (%s:%u, %s)\n", localMsg.constData(), context.file, context.line, context.function); + fprintf(stderr, "#: %s\n", localMsg.constData(), context.file, context.line, context.function); break; case QtWarningMsg: - fprintf(stderr, "%s (%s:%u, %s)\n", localMsg.constData(), context.file, context.line, context.function); + fprintf(stderr, "%s\n", localMsg.constData(), context.file, context.line, context.function); break; case QtCriticalMsg: - fprintf(stderr, "Critical: %s (%s:%u, %s)\n", localMsg.constData(), context.file, context.line, context.function); + fprintf(stderr, "Critical: %s\n", localMsg.constData(), context.file, context.line, context.function); break; case QtFatalMsg: - fprintf(stderr, "Fatal: %s (%s:%u, %s)\n", localMsg.constData(), context.file, context.line, context.function); + fprintf(stderr, "Fatal: %s\n", localMsg.constData(), context.file, context.line, context.function); abort(); } } diff --git a/src/qamqp/amqp_connection.cpp b/src/qamqp/amqp_connection.cpp index 987baea..cd18b04 100644 --- a/src/qamqp/amqp_connection.cpp +++ b/src/qamqp/amqp_connection.cpp @@ -4,11 +4,11 @@ #include "amqp_p.h" #include "amqp_frame.h" - #include #include #include #include +#include using namespace QAMQP; @@ -53,6 +53,9 @@ void ConnectionPrivate::init(Client * parent) { pq_func()->setParent(parent); client_ = parent; + heartbeatTimer_ = new QTimer(parent); + QObject::connect(heartbeatTimer_, SIGNAL(timeout()), + pq_func(), SLOT(_q_heartbeat())); } void ConnectionPrivate::startOk() @@ -91,7 +94,7 @@ void ConnectionPrivate::tuneOk() stream << qint16(0); //channel_max stream << qint32(FRAME_MAX); //frame_max - stream << qint16(0); //heartbeat + stream << qint16(heartbeatTimer_->interval()/1000); //heartbeat frame.setArguments(arguments_); client_->pd_func()->network_->sendFrame(frame); @@ -157,10 +160,16 @@ void ConnectionPrivate::tune( const QAMQP::Frame::Method & frame ) stream >> channel_max; stream >> frame_max; stream >> heartbeat; + qDebug(">> channel_max: %d", channel_max); qDebug(">> frame_max: %d", frame_max); qDebug(">> heartbeat: %d", heartbeat); + if(heartbeatTimer_) + { + heartbeatTimer_->setInterval(heartbeat * 1000); + heartbeatTimer_->start(); + } tuneOk(); open(); } @@ -239,7 +248,6 @@ void ConnectionPrivate::setQOS( qint32 prefetchSize, quint16 prefetchCount, int client_->pd_func()->network_->sendFrame(frame); } - bool ConnectionPrivate::_q_method( const QAMQP::Frame::Method & frame ) { if(frame.methodClass() != QAMQP::Frame::fcConnection) @@ -281,6 +289,12 @@ bool ConnectionPrivate::_q_method( const QAMQP::Frame::Method & frame ) return true; } +void ConnectionPrivate::_q_heartbeat() +{ + QAMQP::Frame::Heartbeat frame; + client_->pd_func()->network_->sendFrame(frame); +} + ////////////////////////////////////////////////////////////////////////// Connection::Connection( Client * parent /*= 0*/ ) diff --git a/src/qamqp/amqp_connection.h b/src/qamqp/amqp_connection.h index 703f133..3ab0e6d 100644 --- a/src/qamqp/amqp_connection.h +++ b/src/qamqp/amqp_connection.h @@ -46,6 +46,7 @@ namespace QAMQP friend class ClientPrivate; friend class ChannelPrivate; Q_PRIVATE_SLOT(pd_func(), void _q_method(const QAMQP::Frame::Method & frame)) + Q_PRIVATE_SLOT(pd_func(), void _q_heartbeat()) }; } diff --git a/src/qamqp/amqp_connection_p.h b/src/qamqp/amqp_connection_p.h index 1fe0f74..cbf2032 100644 --- a/src/qamqp/amqp_connection_p.h +++ b/src/qamqp/amqp_connection_p.h @@ -5,6 +5,8 @@ #include +class QTimer; + namespace QAMQP { class Client; @@ -39,12 +41,14 @@ namespace QAMQP void close(const QAMQP::Frame::Method & frame); void closeOk(const QAMQP::Frame::Method & frame); bool _q_method(const QAMQP::Frame::Method & frame); + void _q_heartbeat(); void setQOS(qint32 prefetchSize, quint16 prefetchCount, int channel, bool global); QPointer client_; bool closed_; bool connected; + QPointer heartbeatTimer_; Connection * const pq_ptr; diff --git a/src/qamqp/amqp_frame.cpp b/src/qamqp/amqp_frame.cpp index 50eef9f..3c05364 100644 --- a/src/qamqp/amqp_frame.cpp +++ b/src/qamqp/amqp_frame.cpp @@ -703,3 +703,11 @@ qint32 QAMQP::Frame::ContentBody::size() const { return body_.size(); } + +////////////////////////////////////////////////////////////////////////// + +QAMQP::Frame::Heartbeat::Heartbeat() : Base(ftHeartbeat) {} + +void QAMQP::Frame::Heartbeat::readPayload(QDataStream & stream) {} +void QAMQP::Frame::Heartbeat::writePayload(QDataStream & stream) const {} + diff --git a/src/qamqp/amqp_frame.h b/src/qamqp/amqp_frame.h index d5e91aa..4d8ca64 100644 --- a/src/qamqp/amqp_frame.h +++ b/src/qamqp/amqp_frame.h @@ -356,6 +356,24 @@ namespace QAMQP private: QByteArray body_; }; + + /*! + @brief Class for working with heartbeat frames. + @detailed Implement frame for heartbeat send. + */ + class Heartbeat : public Base + { + public: + /*! + Heartbeat class constructor. + @detailed Construct frame class for sending. + */ + Heartbeat(); + + protected: + void writePayload(QDataStream & stream) const; + void readPayload(QDataStream & stream); + }; } } diff --git a/src/qamqp/amqp_network.cpp b/src/qamqp/amqp_network.cpp index c3dcb74..07ec9dd 100644 --- a/src/qamqp/amqp_network.cpp +++ b/src/qamqp/amqp_network.cpp @@ -141,6 +141,11 @@ void QAMQP::Network::readyRead() emit body(frame.channel(), frame.body()); } break; + case QAMQP::Frame::ftHeartbeat: + { + qDebug("Heartbeat"); + } + break; default: qWarning("Unknown frame type"); } diff --git a/src/test.cpp b/src/test.cpp index e51be11..2312648 100644 --- a/src/test.cpp +++ b/src/test.cpp @@ -4,7 +4,7 @@ Test::Test() { - QUrl con(QString("amqp://guest:guest@localhost:5672/")); + QUrl con(QString("amqp://guest:guest@192.168.28.128:5672/")); client_ = new QAMQP::Client(this); client_->open(con); exchange_ = client_->createExchange("test.test2"); diff --git a/src/test.h b/src/test.h index 217cf99..177a69f 100644 --- a/src/test.h +++ b/src/test.h @@ -16,7 +16,7 @@ public: private slots: void declared(); - void newMessage(); + void newMessage(); private: QPointer client_;