add heartbeat
This commit is contained in:
parent
fb17149dca
commit
9699491ef1
|
|
@ -32,16 +32,16 @@ void myMessageOutput(QtMsgType type, const QMessageLogContext &context, const QS
|
|||
QByteArray localMsg = msg.toLocal8Bit();
|
||||
switch (type) {
|
||||
case QtDebugMsg:
|
||||
fprintf(stderr, "#: %s (%s:%u, %s)\n", localMsg.constData(), context.file, context.line, context.function);
|
||||
fprintf(stderr, "#: %s\n", localMsg.constData(), context.file, context.line, context.function);
|
||||
break;
|
||||
case QtWarningMsg:
|
||||
fprintf(stderr, "%s (%s:%u, %s)\n", localMsg.constData(), context.file, context.line, context.function);
|
||||
fprintf(stderr, "%s\n", localMsg.constData(), context.file, context.line, context.function);
|
||||
break;
|
||||
case QtCriticalMsg:
|
||||
fprintf(stderr, "Critical: %s (%s:%u, %s)\n", localMsg.constData(), context.file, context.line, context.function);
|
||||
fprintf(stderr, "Critical: %s\n", localMsg.constData(), context.file, context.line, context.function);
|
||||
break;
|
||||
case QtFatalMsg:
|
||||
fprintf(stderr, "Fatal: %s (%s:%u, %s)\n", localMsg.constData(), context.file, context.line, context.function);
|
||||
fprintf(stderr, "Fatal: %s\n", localMsg.constData(), context.file, context.line, context.function);
|
||||
abort();
|
||||
}
|
||||
}
|
||||
|
|
|
|||
|
|
@ -4,11 +4,11 @@
|
|||
#include "amqp_p.h"
|
||||
#include "amqp_frame.h"
|
||||
|
||||
|
||||
#include <QCoreApplication>
|
||||
#include <QDebug>
|
||||
#include <QDataStream>
|
||||
#include <QVariant>
|
||||
#include <QTimer>
|
||||
|
||||
using namespace QAMQP;
|
||||
|
||||
|
|
@ -53,6 +53,9 @@ void ConnectionPrivate::init(Client * parent)
|
|||
{
|
||||
pq_func()->setParent(parent);
|
||||
client_ = parent;
|
||||
heartbeatTimer_ = new QTimer(parent);
|
||||
QObject::connect(heartbeatTimer_, SIGNAL(timeout()),
|
||||
pq_func(), SLOT(_q_heartbeat()));
|
||||
}
|
||||
|
||||
void ConnectionPrivate::startOk()
|
||||
|
|
@ -91,7 +94,7 @@ void ConnectionPrivate::tuneOk()
|
|||
|
||||
stream << qint16(0); //channel_max
|
||||
stream << qint32(FRAME_MAX); //frame_max
|
||||
stream << qint16(0); //heartbeat
|
||||
stream << qint16(heartbeatTimer_->interval()/1000); //heartbeat
|
||||
|
||||
frame.setArguments(arguments_);
|
||||
client_->pd_func()->network_->sendFrame(frame);
|
||||
|
|
@ -157,10 +160,16 @@ void ConnectionPrivate::tune( const QAMQP::Frame::Method & frame )
|
|||
stream >> channel_max;
|
||||
stream >> frame_max;
|
||||
stream >> heartbeat;
|
||||
|
||||
qDebug(">> channel_max: %d", channel_max);
|
||||
qDebug(">> frame_max: %d", frame_max);
|
||||
qDebug(">> heartbeat: %d", heartbeat);
|
||||
|
||||
if(heartbeatTimer_)
|
||||
{
|
||||
heartbeatTimer_->setInterval(heartbeat * 1000);
|
||||
heartbeatTimer_->start();
|
||||
}
|
||||
tuneOk();
|
||||
open();
|
||||
}
|
||||
|
|
@ -239,7 +248,6 @@ void ConnectionPrivate::setQOS( qint32 prefetchSize, quint16 prefetchCount, int
|
|||
client_->pd_func()->network_->sendFrame(frame);
|
||||
}
|
||||
|
||||
|
||||
bool ConnectionPrivate::_q_method( const QAMQP::Frame::Method & frame )
|
||||
{
|
||||
if(frame.methodClass() != QAMQP::Frame::fcConnection)
|
||||
|
|
@ -281,6 +289,12 @@ bool ConnectionPrivate::_q_method( const QAMQP::Frame::Method & frame )
|
|||
return true;
|
||||
}
|
||||
|
||||
void ConnectionPrivate::_q_heartbeat()
|
||||
{
|
||||
QAMQP::Frame::Heartbeat frame;
|
||||
client_->pd_func()->network_->sendFrame(frame);
|
||||
}
|
||||
|
||||
//////////////////////////////////////////////////////////////////////////
|
||||
|
||||
Connection::Connection( Client * parent /*= 0*/ )
|
||||
|
|
|
|||
|
|
@ -46,6 +46,7 @@ namespace QAMQP
|
|||
friend class ClientPrivate;
|
||||
friend class ChannelPrivate;
|
||||
Q_PRIVATE_SLOT(pd_func(), void _q_method(const QAMQP::Frame::Method & frame))
|
||||
Q_PRIVATE_SLOT(pd_func(), void _q_heartbeat())
|
||||
};
|
||||
}
|
||||
|
||||
|
|
|
|||
|
|
@ -5,6 +5,8 @@
|
|||
|
||||
#include <QPointer>
|
||||
|
||||
class QTimer;
|
||||
|
||||
namespace QAMQP
|
||||
{
|
||||
class Client;
|
||||
|
|
@ -39,12 +41,14 @@ namespace QAMQP
|
|||
void close(const QAMQP::Frame::Method & frame);
|
||||
void closeOk(const QAMQP::Frame::Method & frame);
|
||||
bool _q_method(const QAMQP::Frame::Method & frame);
|
||||
void _q_heartbeat();
|
||||
|
||||
void setQOS(qint32 prefetchSize, quint16 prefetchCount, int channel, bool global);
|
||||
|
||||
QPointer<Client> client_;
|
||||
bool closed_;
|
||||
bool connected;
|
||||
QPointer<QTimer> heartbeatTimer_;
|
||||
|
||||
Connection * const pq_ptr;
|
||||
|
||||
|
|
|
|||
|
|
@ -703,3 +703,11 @@ qint32 QAMQP::Frame::ContentBody::size() const
|
|||
{
|
||||
return body_.size();
|
||||
}
|
||||
|
||||
//////////////////////////////////////////////////////////////////////////
|
||||
|
||||
QAMQP::Frame::Heartbeat::Heartbeat() : Base(ftHeartbeat) {}
|
||||
|
||||
void QAMQP::Frame::Heartbeat::readPayload(QDataStream & stream) {}
|
||||
void QAMQP::Frame::Heartbeat::writePayload(QDataStream & stream) const {}
|
||||
|
||||
|
|
|
|||
|
|
@ -356,6 +356,24 @@ namespace QAMQP
|
|||
private:
|
||||
QByteArray body_;
|
||||
};
|
||||
|
||||
/*!
|
||||
@brief Class for working with heartbeat frames.
|
||||
@detailed Implement frame for heartbeat send.
|
||||
*/
|
||||
class Heartbeat : public Base
|
||||
{
|
||||
public:
|
||||
/*!
|
||||
Heartbeat class constructor.
|
||||
@detailed Construct frame class for sending.
|
||||
*/
|
||||
Heartbeat();
|
||||
|
||||
protected:
|
||||
void writePayload(QDataStream & stream) const;
|
||||
void readPayload(QDataStream & stream);
|
||||
};
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
|||
|
|
@ -141,6 +141,11 @@ void QAMQP::Network::readyRead()
|
|||
emit body(frame.channel(), frame.body());
|
||||
}
|
||||
break;
|
||||
case QAMQP::Frame::ftHeartbeat:
|
||||
{
|
||||
qDebug("Heartbeat");
|
||||
}
|
||||
break;
|
||||
default:
|
||||
qWarning("Unknown frame type");
|
||||
}
|
||||
|
|
|
|||
|
|
@ -4,7 +4,7 @@
|
|||
|
||||
Test::Test()
|
||||
{
|
||||
QUrl con(QString("amqp://guest:guest@localhost:5672/"));
|
||||
QUrl con(QString("amqp://guest:guest@192.168.28.128:5672/"));
|
||||
client_ = new QAMQP::Client(this);
|
||||
client_->open(con);
|
||||
exchange_ = client_->createExchange("test.test2");
|
||||
|
|
|
|||
|
|
@ -16,7 +16,7 @@ public:
|
|||
private slots:
|
||||
|
||||
void declared();
|
||||
void newMessage();
|
||||
void newMessage();
|
||||
|
||||
private:
|
||||
QPointer<QAMQP::Client> client_;
|
||||
|
|
|
|||
Loading…
Reference in New Issue