diff --git a/src/amqp_channel.cpp b/src/amqp_channel.cpp index 74d5dd9..12e2d64 100644 --- a/src/amqp_channel.cpp +++ b/src/amqp_channel.cpp @@ -13,7 +13,7 @@ ChannelPrivate::ChannelPrivate(Channel *q) : channelNumber(0), opened(false), needOpen(true), - error(Channel::NoError), + error(QAMQP::NoError), q_ptr(q) { } @@ -163,8 +163,8 @@ void ChannelPrivate::close(const Frame::Method &frame) stream >> classId; stream >> methodId; - Channel::ChannelError checkError = static_cast(code); - if (checkError != Channel::NoError) { + Error checkError = static_cast(code); + if (checkError != QAMQP::NoError) { error = checkError; errorString = qPrintable(text); Q_EMIT q->error(error); @@ -282,7 +282,7 @@ void Channel::setQOS(qint32 prefetchSize, quint16 prefetchCount) d->setQOS(prefetchSize, prefetchCount); } -Channel::ChannelError Channel::error() const +Error Channel::error() const { Q_D(const Channel); return d->error; diff --git a/src/amqp_channel.h b/src/amqp_channel.h index b726d81..da3c7dd 100644 --- a/src/amqp_channel.h +++ b/src/amqp_channel.h @@ -24,16 +24,7 @@ public: QString name() const; void setName(const QString &name); - enum ChannelError { - NoError = 0, - ContentTooLargeError = 311, - NoConsumersError = 313, - AccessRefusedError = 403, - NotFoundError = 404, - ResourceLockedError = 405, - PreconditionFailedError = 406 - }; - ChannelError error() const; + Error error() const; QString errorString() const; public Q_SLOTS: @@ -45,7 +36,7 @@ Q_SIGNALS: void opened(); void closed(); void flowChanged(bool enabled); - void error(ChannelError error); + void error(QAMQP::Error error); protected: virtual void channelOpened() = 0; diff --git a/src/amqp_channel_p.h b/src/amqp_channel_p.h index ec17cdf..6f2f244 100644 --- a/src/amqp_channel_p.h +++ b/src/amqp_channel_p.h @@ -77,7 +77,7 @@ public: bool opened; bool needOpen; - Channel::ChannelError error; + Error error; QString errorString; Q_DECLARE_PUBLIC(Channel) diff --git a/src/amqp_client.cpp b/src/amqp_client.cpp index ba7a930..a1ee157 100644 --- a/src/amqp_client.cpp +++ b/src/amqp_client.cpp @@ -25,7 +25,7 @@ ClientPrivate::ClientPrivate(Client *q) channelMax(0), heartbeatDelay(0), frameMax(AMQP_FRAME_MAX), - error(Client::NoError), + error(QAMQP::NoError), q_ptr(q) { } @@ -55,6 +55,7 @@ void ClientPrivate::initSocket() Q_Q(Client); socket = new QTcpSocket(q); QObject::connect(socket, SIGNAL(connected()), q, SLOT(_q_socketConnected())); + QObject::connect(socket, SIGNAL(disconnected()), q, SLOT(_q_socketDisconnected())); QObject::connect(socket, SIGNAL(readyRead()), q, SLOT(_q_readyRead())); QObject::connect(socket, SIGNAL(error(QAbstractSocket::SocketError)), q, SLOT(_q_socketError(QAbstractSocket::SocketError))); @@ -105,6 +106,16 @@ void ClientPrivate::_q_socketConnected() socket->write(header, 8); } +void ClientPrivate::_q_socketDisconnected() +{ + Q_Q(Client); + buffer.clear(); + if (connected) { + connected = false; + Q_EMIT q->disconnected(); + } +} + void ClientPrivate::_q_heartbeat() { Frame::Heartbeat frame; @@ -131,10 +142,14 @@ void ClientPrivate::_q_socketError(QAbstractSocket::SocketError error) case QAbstractSocket::ProxyConnectionTimeoutError: default: - qWarning() << "AMQP: Socket Error: " << socket->errorString(); + qAmqpDebug() << "socket Error: " << socket->errorString(); break; } + // per spec, on any error we need to close the socket immediately + // and send no more data; + socket->close(); + if (autoReconnect) { QTimer::singleShot(timeout, q, SLOT(_q_connect())); } @@ -167,7 +182,7 @@ void ClientPrivate::_q_readyRead() { Frame::Method frame(streamB); if (frame.size() > frameMax) { - close(Client::FrameError, "frame size too large"); + close(FrameError, "frame size too large"); return; } @@ -183,7 +198,7 @@ void ClientPrivate::_q_readyRead() { Frame::Content frame(streamB); if (frame.size() > frameMax) { - close(Client::FrameError, "frame size too large"); + close(FrameError, "frame size too large"); return; } @@ -195,7 +210,7 @@ void ClientPrivate::_q_readyRead() { Frame::ContentBody frame(streamB); if (frame.size() > frameMax) { - close(Client::FrameError, "frame size too large"); + close(FrameError, "frame size too large"); return; } @@ -372,8 +387,8 @@ void ClientPrivate::close(const Frame::Method &frame) stream >> classId; stream >> methodId; - Client::ConnectionError checkError = static_cast(code); - if (checkError != Client::NoError) { + Error checkError = static_cast(code); + if (checkError != QAMQP::NoError) { error = checkError; errorString = qPrintable(text); Q_EMIT q->error(error); @@ -749,6 +764,7 @@ void SslClientPrivate::initSocket() Q_Q(Client); QSslSocket *sslSocket = new QSslSocket(q); QObject::connect(sslSocket, SIGNAL(connected()), q, SLOT(_q_socketConnected())); + QObject::connect(sslSocket, SIGNAL(disconnected()), q, SLOT(_q_socketDisconnected())); QObject::connect(sslSocket, SIGNAL(readyRead()), q, SLOT(_q_readyRead())); QObject::connect(sslSocket, SIGNAL(error(QAbstractSocket::SocketError)), q, SLOT(_q_socketError(QAbstractSocket::SocketError))); diff --git a/src/amqp_client.h b/src/amqp_client.h index 4a415f1..471bca6 100644 --- a/src/amqp_client.h +++ b/src/amqp_client.h @@ -73,21 +73,7 @@ public: void addCustomProperty(const QString &name, const QString &value); QString customProperty(const QString &name) const; - enum ConnectionError { - NoError = 0, - ConnectionForcedError = 320, - InvalidPathError = 402, - FrameError = 501, - SyntaxError = 502, - CommandInvalidError = 503, - ChannelError = 504, - UnexpectedFrameError = 505, - ResourceError = 506, - NotAllowedError = 530, - NotImplementedError = 540, - InternalError = 541 - }; - ConnectionError error() const; + Error error() const; QString errorString() const; // channels @@ -105,7 +91,7 @@ public: Q_SIGNALS: void connected(); void disconnected(); - void error(ConnectionError error); + void error(Error error); protected: Client(ClientPrivate *dd, QObject *parent = 0); @@ -116,6 +102,7 @@ protected: private: Q_PRIVATE_SLOT(d_func(), void _q_socketConnected()) + Q_PRIVATE_SLOT(d_func(), void _q_socketDisconnected()) Q_PRIVATE_SLOT(d_func(), void _q_readyRead()) Q_PRIVATE_SLOT(d_func(), void _q_socketError(QAbstractSocket::SocketError error)) Q_PRIVATE_SLOT(d_func(), void _q_heartbeat()) diff --git a/src/amqp_client_p.h b/src/amqp_client_p.h index b62f68d..38d29a8 100644 --- a/src/amqp_client_p.h +++ b/src/amqp_client_p.h @@ -42,6 +42,7 @@ public: // private slots void _q_socketConnected(); + void _q_socketDisconnected(); void _q_readyRead(); void _q_socketError(QAbstractSocket::SocketError error); void _q_heartbeat(); @@ -94,7 +95,7 @@ public: qint16 heartbeatDelay; qint32 frameMax; - Client::ConnectionError error; + Error error; QString errorString; Client * const q_ptr; diff --git a/src/amqp_global.h b/src/amqp_global.h index d7e1bed..8bf2178 100644 --- a/src/amqp_global.h +++ b/src/amqp_global.h @@ -28,4 +28,29 @@ #define qAmqpDebug if (qgetenv("QAMQP_DEBUG").isEmpty()); else qDebug +namespace QAMQP { + +enum Error { + NoError = 0, + ContentTooLargeError = 311, + NoConsumersError = 313, + ConnectionForcedError = 320, + InvalidPathError = 402, + AccessRefusedError = 403, + NotFoundError = 404, + ResourceLockedError = 405, + PreconditionFailedError = 406, + FrameError = 501, + SyntaxError = 502, + CommandInvalidError = 503, + ChannelError = 504, + UnexpectedFrameError = 505, + ResourceError = 506, + NotAllowedError = 530, + NotImplementedError = 540, + InternalError = 541 +}; + +} // namespace QAMQP + #endif // qamqp_global_h__ diff --git a/tests/auto/qamqpclient/tst_qamqpclient.cpp b/tests/auto/qamqpclient/tst_qamqpclient.cpp index 0046d8e..9155c81 100644 --- a/tests/auto/qamqpclient/tst_qamqpclient.cpp +++ b/tests/auto/qamqpclient/tst_qamqpclient.cpp @@ -13,7 +13,6 @@ private Q_SLOTS: void connect(); void connectDisconnect(); void invalidAuthenticationMechanism(); - void tune(); private: diff --git a/tests/auto/qamqpexchange/tst_qamqpexchange.cpp b/tests/auto/qamqpexchange/tst_qamqpexchange.cpp index 84d04e5..bf7512f 100644 --- a/tests/auto/qamqpexchange/tst_qamqpexchange.cpp +++ b/tests/auto/qamqpexchange/tst_qamqpexchange.cpp @@ -72,8 +72,8 @@ void tst_QAMQPExchange::removeIfUnused() QVERIFY(waitForSignal(queue, SIGNAL(bound()))); exchange->remove(Exchange::roIfUnused); - QVERIFY(waitForSignal(exchange, SIGNAL(error(ChannelError)))); - QCOMPARE(exchange->error(), Exchange::PreconditionFailedError); + QVERIFY(waitForSignal(exchange, SIGNAL(error(QAMQP::Error)))); + QCOMPARE(exchange->error(), QAMQP::PreconditionFailedError); QVERIFY(!exchange->errorString().isEmpty()); // cleanup diff --git a/tests/auto/qamqpqueue/tst_qamqpqueue.cpp b/tests/auto/qamqpqueue/tst_qamqpqueue.cpp index 551a68e..5d775bd 100644 --- a/tests/auto/qamqpqueue/tst_qamqpqueue.cpp +++ b/tests/auto/qamqpqueue/tst_qamqpqueue.cpp @@ -116,8 +116,8 @@ void tst_QAMQPQueue::exclusiveAccess() QVERIFY(waitForSignal(&secondClient, SIGNAL(connected()))); Queue *passiveQueue = secondClient.createQueue("test-exclusive-queue"); passiveQueue->declare(Queue::Passive); - QVERIFY(waitForSignal(passiveQueue, SIGNAL(error(ChannelError)))); - QCOMPARE(passiveQueue->error(), Channel::ResourceLockedError); + QVERIFY(waitForSignal(passiveQueue, SIGNAL(error(QAMQP::Error)))); + QCOMPARE(passiveQueue->error(), QAMQP::ResourceLockedError); secondClient.disconnectFromHost(); QVERIFY(waitForSignal(&secondClient, SIGNAL(disconnected()))); @@ -138,8 +138,8 @@ void tst_QAMQPQueue::exclusiveRemoval() QVERIFY(waitForSignal(&secondClient, SIGNAL(connected()))); Queue *passiveQueue = secondClient.createQueue("test-exclusive-queue"); passiveQueue->declare(Queue::Passive); - QVERIFY(waitForSignal(passiveQueue, SIGNAL(error(ChannelError)))); - QCOMPARE(passiveQueue->error(), Channel::NotFoundError); + QVERIFY(waitForSignal(passiveQueue, SIGNAL(error(QAMQP::Error)))); + QCOMPARE(passiveQueue->error(), QAMQP::NotFoundError); secondClient.disconnectFromHost(); QVERIFY(waitForSignal(&secondClient, SIGNAL(disconnected()))); } @@ -161,8 +161,8 @@ void tst_QAMQPQueue::removeIfUnused() queue->consume(); queue->remove(Queue::roIfUnused); - QVERIFY(waitForSignal(queue, SIGNAL(error(ChannelError)))); - QCOMPARE(queue->error(), Channel::PreconditionFailedError); + QVERIFY(waitForSignal(queue, SIGNAL(error(QAMQP::Error)))); + QCOMPARE(queue->error(), QAMQP::PreconditionFailedError); QVERIFY(!queue->errorString().isEmpty()); } @@ -185,8 +185,8 @@ void tst_QAMQPQueue::removeIfEmpty() QVERIFY(waitForSignal(testDeleteQueue, SIGNAL(declared()))); testDeleteQueue->remove(Queue::roIfEmpty); - QVERIFY(waitForSignal(testDeleteQueue, SIGNAL(error(ChannelError)))); - QCOMPARE(testDeleteQueue->error(), Channel::PreconditionFailedError); + QVERIFY(waitForSignal(testDeleteQueue, SIGNAL(error(QAMQP::Error)))); + QCOMPARE(testDeleteQueue->error(), QAMQP::PreconditionFailedError); QVERIFY(!testDeleteQueue->errorString().isEmpty()); secondClient.disconnectFromHost();