diff --git a/src/qamqpclient.cpp b/src/qamqpclient.cpp index 7a22ad4..0488d80 100644 --- a/src/qamqpclient.cpp +++ b/src/qamqpclient.cpp @@ -45,6 +45,9 @@ void QAmqpClientPrivate::init() initSocket(); heartbeatTimer = new QTimer(q); QObject::connect(heartbeatTimer, SIGNAL(timeout()), q, SLOT(_q_heartbeat())); + reconnectTimer = new QTimer(q); + reconnectTimer->setSingleShot(true); + QObject::connect(reconnectTimer, SIGNAL(timeout()), q, SLOT(_q_connect())); authenticator = QSharedPointer( new QAmqpPlainAuthenticator(QString::fromLatin1(AMQP_LOGIN), QString::fromLatin1(AMQP_PSWD))); @@ -136,9 +139,13 @@ void QAmqpClientPrivate::parseConnectionString(const QString &uri) void QAmqpClientPrivate::_q_connect() { + if (reconnectTimer) + reconnectTimer->stop(); if (socket->state() != QAbstractSocket::UnconnectedState) { qAmqpDebug() << Q_FUNC_INFO << "socket already connected, disconnecting.."; _q_disconnect(); + // We need to explicitly close connection here because either way it will not be closed until we receive closeOk + closeConnection(); } qAmqpDebug() << "connecting to host: " << host << ", port: " << port; @@ -150,6 +157,8 @@ void QAmqpClientPrivate::_q_connect() void QAmqpClientPrivate::_q_disconnect() { + if (reconnectTimer) + reconnectTimer->stop(); if (socket->state() == QAbstractSocket::UnconnectedState) { qAmqpDebug() << Q_FUNC_INFO << "already disconnected"; return; @@ -162,10 +171,10 @@ void QAmqpClientPrivate::_q_disconnect() // private slots void QAmqpClientPrivate::_q_socketConnected() { + if (reconnectTimer) + reconnectTimer->stop(); if(reconnectFixedTimeout == false) - { timeout = 0; - } char header[8] = {'A', 'M', 'Q', 'P', 0, 0, 9, 1}; socket->write(header, 8); } @@ -188,7 +197,6 @@ void QAmqpClientPrivate::_q_heartbeat() void QAmqpClientPrivate::_q_socketError(QAbstractSocket::SocketError error) { - Q_Q(QAmqpClient); if(reconnectFixedTimeout == false) { if (timeout <= 0) { @@ -223,9 +231,9 @@ void QAmqpClientPrivate::_q_socketError(QAbstractSocket::SocketError error) errorString = socket->errorString(); - if (autoReconnect) { + if (autoReconnect && reconnectTimer) { qAmqpDebug() << "trying to reconnect after: " << timeout << "ms"; - QTimer::singleShot(timeout, q, SLOT(_q_connect())); + reconnectTimer->start(timeout); } } @@ -336,6 +344,18 @@ void QAmqpClientPrivate::sendFrame(const QAmqpFrame &frame) stream << frame; } +void QAmqpClientPrivate::closeConnection() +{ + qAmqpDebug("AMQP: closing connection"); + + connected = false; + if (reconnectTimer) + reconnectTimer->stop(); + if (heartbeatTimer) + heartbeatTimer->stop(); + socket->disconnectFromHost(); +} + bool QAmqpClientPrivate::_q_method(const QAmqpMethodFrame &frame) { Q_ASSERT(frame.methodClass() == QAmqpFrame::Connection); @@ -453,11 +473,7 @@ void QAmqpClientPrivate::closeOk(const QAmqpMethodFrame &frame) { Q_UNUSED(frame) qAmqpDebug("-> connection#closeOk()"); - - connected = false; - if (heartbeatTimer) - heartbeatTimer->stop(); - socket->disconnectFromHost(); + closeConnection(); } void QAmqpClientPrivate::close(const QAmqpMethodFrame &frame) @@ -482,7 +498,7 @@ void QAmqpClientPrivate::close(const QAmqpMethodFrame &frame) // if it was a force disconnect, simulate receiving a closeOk if (checkError == QAMQP::ConnectionForcedError) { - closeOk(QAmqpMethodFrame()); + closeConnection(); if (autoReconnect) { qAmqpDebug() << "trying to reconnect after: " << timeout << "ms"; QTimer::singleShot(timeout, q, SLOT(_q_connect())); @@ -499,6 +515,7 @@ void QAmqpClientPrivate::close(const QAmqpMethodFrame &frame) QAmqpMethodFrame closeOkFrame(QAmqpFrame::Connection, QAmqpClientPrivate::miCloseOk); qAmqpDebug("<- connection#closeOk()"); sendFrame(closeOkFrame); + closeConnection(); } void QAmqpClientPrivate::startOk() @@ -921,4 +938,11 @@ void QAmqpClient::disconnectFromHost() d->_q_disconnect(); } +void QAmqpClient::forceDisconnectFromHost() +{ + Q_D(QAmqpClient); + d->_q_disconnect(); + d->closeConnection(); +} + #include "moc_qamqpclient.cpp" diff --git a/src/qamqpclient.h b/src/qamqpclient.h index 0bd8943..283394e 100644 --- a/src/qamqpclient.h +++ b/src/qamqpclient.h @@ -108,6 +108,7 @@ public: void connectToHost(const QString &uri = QString()); void connectToHost(const QHostAddress &address, quint16 port = AMQP_PORT); void disconnectFromHost(); + void forceDisconnectFromHost(); Q_SIGNALS: void connected(); diff --git a/src/qamqpclient_p.h b/src/qamqpclient_p.h index 2de4a86..397e426 100644 --- a/src/qamqpclient_p.h +++ b/src/qamqpclient_p.h @@ -44,6 +44,8 @@ public: void parseConnectionString(const QString &uri); void sendFrame(const QAmqpFrame &frame); + void closeConnection(); + // private slots void _q_socketConnected(); void _q_socketDisconnected(); @@ -95,6 +97,7 @@ public: bool closed; bool connected; QPointer heartbeatTimer; + QPointer reconnectTimer; QAmqpTable customProperties; qint16 channelMax; qint16 heartbeatDelay;