Compare commits
10 Commits
6d2a7da3a2
...
0300258b93
| Author | SHA1 | Date |
|---|---|---|
|
|
0300258b93 | |
|
|
f5b9d2e333 | |
|
|
d912bddfce | |
|
|
6cc6bbcdfa | |
|
|
b5c660a1ac | |
|
|
ff81962201 | |
|
|
f63b294e79 | |
|
|
737349ab40 | |
|
|
a0bc160ea1 | |
|
|
d20aa2c89d |
|
|
@ -1,8 +1,8 @@
|
||||||
language: cpp
|
language: cpp
|
||||||
cache: apt
|
cache: apt
|
||||||
env:
|
env:
|
||||||
- QT_SELECT=qt4
|
|
||||||
- QT_SELECT=qt5
|
- QT_SELECT=qt5
|
||||||
|
- QT_SELECT=qt6
|
||||||
services:
|
services:
|
||||||
- rabbitmq
|
- rabbitmq
|
||||||
before_install:
|
before_install:
|
||||||
|
|
|
||||||
12
README.md
12
README.md
|
|
@ -3,7 +3,7 @@
|
||||||
|
|
||||||
QAMQP
|
QAMQP
|
||||||
=============
|
=============
|
||||||
A Qt4/Qt5 implementation of AMQP 0.9.1, focusing primarily on RabbitMQ support.
|
A Qt5/Qt6 implementation of AMQP 0.9.1, focusing primarily on RabbitMQ support.
|
||||||
|
|
||||||
Usage
|
Usage
|
||||||
------------
|
------------
|
||||||
|
|
@ -16,8 +16,16 @@ Usage
|
||||||
|
|
||||||
Documentation
|
Documentation
|
||||||
------------
|
------------
|
||||||
Coming soon!
|
Tests checked and integrated against rabbitmq 3.11 (August 1, 2022)
|
||||||
|
Qt5.6.3 (MSVC2017) 32Bit
|
||||||
|
Qt5.15.2 (MSVC2019, MSVC2022, MinGW, Clang) 32 and 64Bit
|
||||||
|
Qt6.5 (MSVC2019, MSVC2022) 64Bit
|
||||||
|
|
||||||
|
A good starting point is :
|
||||||
|
* running a local RabbitMQ,
|
||||||
|
* browse to http://localhost:15672/#/queues (guest/guest)
|
||||||
|
* Start the "receive" sample and see in your browser the "hello" queue appear
|
||||||
|
* publish a message there
|
||||||
|
|
||||||
AMQP Support
|
AMQP Support
|
||||||
------------
|
------------
|
||||||
|
|
|
||||||
|
|
@ -1,4 +1,4 @@
|
||||||
QAMQP_VERSION = 0.5.0
|
QAMQP_VERSION = 0.6.0
|
||||||
|
|
||||||
isEmpty(QAMQP_LIBRARY_TYPE) {
|
isEmpty(QAMQP_LIBRARY_TYPE) {
|
||||||
QAMQP_LIBRARY_TYPE = shared
|
QAMQP_LIBRARY_TYPE = shared
|
||||||
|
|
@ -7,6 +7,7 @@ isEmpty(QAMQP_LIBRARY_TYPE) {
|
||||||
QT += network
|
QT += network
|
||||||
QAMQP_INCLUDEPATH = $${PWD}/src
|
QAMQP_INCLUDEPATH = $${PWD}/src
|
||||||
QAMQP_LIBS = -lqamqp
|
QAMQP_LIBS = -lqamqp
|
||||||
|
|
||||||
CONFIG(debug, debug|release){
|
CONFIG(debug, debug|release){
|
||||||
QAMQP_LIBS = -lqamqpd
|
QAMQP_LIBS = -lqamqpd
|
||||||
}
|
}
|
||||||
|
|
@ -14,7 +15,6 @@ contains(QAMQP_LIBRARY_TYPE, staticlib) {
|
||||||
DEFINES += QAMQP_STATIC
|
DEFINES += QAMQP_STATIC
|
||||||
} else {
|
} else {
|
||||||
DEFINES += QAMQP_SHARED
|
DEFINES += QAMQP_SHARED
|
||||||
win32:QAMQP_LIBS = -lqamqp0
|
|
||||||
}
|
}
|
||||||
|
|
||||||
isEmpty(PREFIX) {
|
isEmpty(PREFIX) {
|
||||||
|
|
|
||||||
|
|
@ -1,6 +1,7 @@
|
||||||
debug/*
|
debug/*
|
||||||
release/*
|
release/*
|
||||||
*.lib
|
*.lib
|
||||||
|
*.dll
|
||||||
*.so
|
*.so
|
||||||
*.a
|
*.a
|
||||||
*.prl
|
*.prl
|
||||||
|
|
|
||||||
|
|
@ -62,10 +62,25 @@ void QAmqpClientPrivate::initSocket()
|
||||||
QObject::connect(socket, SIGNAL(connected()), q, SLOT(_q_socketConnected()));
|
QObject::connect(socket, SIGNAL(connected()), q, SLOT(_q_socketConnected()));
|
||||||
QObject::connect(socket, SIGNAL(disconnected()), q, SLOT(_q_socketDisconnected()));
|
QObject::connect(socket, SIGNAL(disconnected()), q, SLOT(_q_socketDisconnected()));
|
||||||
QObject::connect(socket, SIGNAL(readyRead()), q, SLOT(_q_readyRead()));
|
QObject::connect(socket, SIGNAL(readyRead()), q, SLOT(_q_readyRead()));
|
||||||
QObject::connect(socket, SIGNAL(error(QAbstractSocket::SocketError)),
|
#if QT_VERSION >= 0x060000
|
||||||
q, SLOT(_q_socketError(QAbstractSocket::SocketError)));
|
QObject::connect(socket,
|
||||||
QObject::connect(socket, SIGNAL(error(QAbstractSocket::SocketError)),
|
SIGNAL(errorOccurred(QAbstractSocket::SocketError)),
|
||||||
q, SIGNAL(socketError(QAbstractSocket::SocketError)));
|
q,
|
||||||
|
SLOT(_q_socketError(QAbstractSocket::SocketError)));
|
||||||
|
QObject::connect(socket,
|
||||||
|
SIGNAL(errorOccurred(QAbstractSocket::SocketError)),
|
||||||
|
q,
|
||||||
|
SIGNAL(socketErrorOccurred(QAbstractSocket::SocketError)));
|
||||||
|
#else
|
||||||
|
QObject::connect(socket,
|
||||||
|
SIGNAL(error(QAbstractSocket::SocketError)),
|
||||||
|
q,
|
||||||
|
SLOT(_q_socketError(QAbstractSocket::SocketError)));
|
||||||
|
QObject::connect(socket,
|
||||||
|
SIGNAL(error(QAbstractSocket::SocketError)),
|
||||||
|
q,
|
||||||
|
SIGNAL(socketErrorOccurred(QAbstractSocket::SocketError)));
|
||||||
|
#endif
|
||||||
QObject::connect(socket, SIGNAL(stateChanged(QAbstractSocket::SocketState)),
|
QObject::connect(socket, SIGNAL(stateChanged(QAbstractSocket::SocketState)),
|
||||||
q, SIGNAL(socketStateChanged(QAbstractSocket::SocketState)));
|
q, SIGNAL(socketStateChanged(QAbstractSocket::SocketState)));
|
||||||
QObject::connect(socket, SIGNAL(sslErrors(QList<QSslError>)),
|
QObject::connect(socket, SIGNAL(sslErrors(QList<QSslError>)),
|
||||||
|
|
@ -107,11 +122,7 @@ void QAmqpClientPrivate::setPassword(const QString &password)
|
||||||
|
|
||||||
void QAmqpClientPrivate::parseConnectionString(const QString &uri)
|
void QAmqpClientPrivate::parseConnectionString(const QString &uri)
|
||||||
{
|
{
|
||||||
#if QT_VERSION > 0x040801
|
|
||||||
QUrl connectionString = QUrl::fromUserInput(uri);
|
QUrl connectionString = QUrl::fromUserInput(uri);
|
||||||
#else
|
|
||||||
QUrl connectionString(uri, QUrl::TolerantMode);
|
|
||||||
#endif
|
|
||||||
|
|
||||||
if (connectionString.scheme() != AMQP_SCHEME &&
|
if (connectionString.scheme() != AMQP_SCHEME &&
|
||||||
connectionString.scheme() != AMQP_SSL_SCHEME) {
|
connectionString.scheme() != AMQP_SSL_SCHEME) {
|
||||||
|
|
@ -126,15 +137,9 @@ void QAmqpClientPrivate::parseConnectionString(const QString &uri)
|
||||||
QString vhost = connectionString.path();
|
QString vhost = connectionString.path();
|
||||||
if (vhost.startsWith("/") && vhost.size() > 1)
|
if (vhost.startsWith("/") && vhost.size() > 1)
|
||||||
vhost = vhost.mid(1);
|
vhost = vhost.mid(1);
|
||||||
#if QT_VERSION <= 0x050200
|
|
||||||
virtualHost = QUrl::fromPercentEncoding(vhost.toUtf8());
|
|
||||||
setPassword(QUrl::fromPercentEncoding(connectionString.password().toUtf8()));
|
|
||||||
setUsername(QUrl::fromPercentEncoding(connectionString.userName().toUtf8()));
|
|
||||||
#else
|
|
||||||
virtualHost = vhost;
|
virtualHost = vhost;
|
||||||
setPassword(connectionString.password());
|
setPassword(connectionString.password());
|
||||||
setUsername(connectionString.userName());
|
setUsername(connectionString.userName());
|
||||||
#endif
|
|
||||||
}
|
}
|
||||||
|
|
||||||
void QAmqpClientPrivate::_q_connect()
|
void QAmqpClientPrivate::_q_connect()
|
||||||
|
|
@ -239,6 +244,8 @@ void QAmqpClientPrivate::_q_socketError(QAbstractSocket::SocketError error)
|
||||||
|
|
||||||
void QAmqpClientPrivate::_q_readyRead()
|
void QAmqpClientPrivate::_q_readyRead()
|
||||||
{
|
{
|
||||||
|
Q_Q(QAmqpClient);
|
||||||
|
|
||||||
while (socket->bytesAvailable() >= QAmqpFrame::HEADER_SIZE) {
|
while (socket->bytesAvailable() >= QAmqpFrame::HEADER_SIZE) {
|
||||||
unsigned char headerData[QAmqpFrame::HEADER_SIZE];
|
unsigned char headerData[QAmqpFrame::HEADER_SIZE];
|
||||||
socket->peek((char*)headerData, QAmqpFrame::HEADER_SIZE);
|
socket->peek((char*)headerData, QAmqpFrame::HEADER_SIZE);
|
||||||
|
|
@ -323,6 +330,7 @@ void QAmqpClientPrivate::_q_readyRead()
|
||||||
}
|
}
|
||||||
|
|
||||||
qAmqpDebug("AMQP: Heartbeat");
|
qAmqpDebug("AMQP: Heartbeat");
|
||||||
|
Q_EMIT q->heartbeat();
|
||||||
}
|
}
|
||||||
break;
|
break;
|
||||||
default:
|
default:
|
||||||
|
|
@ -528,7 +536,11 @@ void QAmqpClientPrivate::startOk()
|
||||||
clientProperties["version"] = QString(QAMQP_VERSION);
|
clientProperties["version"] = QString(QAMQP_VERSION);
|
||||||
clientProperties["platform"] = QString("Qt %1").arg(qVersion());
|
clientProperties["platform"] = QString("Qt %1").arg(qVersion());
|
||||||
clientProperties["product"] = QString("QAMQP");
|
clientProperties["product"] = QString("QAMQP");
|
||||||
|
#if QT_VERSION >= 0x060000
|
||||||
|
clientProperties.insert(customProperties);
|
||||||
|
#else
|
||||||
clientProperties.unite(customProperties);
|
clientProperties.unite(customProperties);
|
||||||
|
#endif
|
||||||
stream << clientProperties;
|
stream << clientProperties;
|
||||||
|
|
||||||
authenticator->write(stream);
|
authenticator->write(stream);
|
||||||
|
|
|
||||||
|
|
@ -113,8 +113,9 @@ public:
|
||||||
Q_SIGNALS:
|
Q_SIGNALS:
|
||||||
void connected();
|
void connected();
|
||||||
void disconnected();
|
void disconnected();
|
||||||
|
void heartbeat();
|
||||||
void error(QAMQP::Error error);
|
void error(QAMQP::Error error);
|
||||||
void socketError(QAbstractSocket::SocketError error);
|
void socketErrorOccurred(QAbstractSocket::SocketError error);
|
||||||
void socketStateChanged(QAbstractSocket::SocketState state);
|
void socketStateChanged(QAbstractSocket::SocketState state);
|
||||||
void sslErrors(const QList<QSslError> &errors);
|
void sslErrors(const QList<QSslError> &errors);
|
||||||
|
|
||||||
|
|
|
||||||
|
|
@ -31,7 +31,6 @@ class QAMQP_EXPORT QAmqpExchange : public QAmqpChannel
|
||||||
Q_OBJECT
|
Q_OBJECT
|
||||||
Q_PROPERTY(QString type READ type CONSTANT)
|
Q_PROPERTY(QString type READ type CONSTANT)
|
||||||
Q_PROPERTY(ExchangeOptions options READ options CONSTANT)
|
Q_PROPERTY(ExchangeOptions options READ options CONSTANT)
|
||||||
Q_ENUMS(ExchangeOptions)
|
|
||||||
|
|
||||||
public:
|
public:
|
||||||
virtual ~QAmqpExchange();
|
virtual ~QAmqpExchange();
|
||||||
|
|
@ -68,6 +67,7 @@ public:
|
||||||
};
|
};
|
||||||
Q_DECLARE_FLAGS(ExchangeOptions, ExchangeOption)
|
Q_DECLARE_FLAGS(ExchangeOptions, ExchangeOption)
|
||||||
ExchangeOptions options() const;
|
ExchangeOptions options() const;
|
||||||
|
Q_ENUM(ExchangeOptions)
|
||||||
|
|
||||||
bool isDeclared() const;
|
bool isDeclared() const;
|
||||||
|
|
||||||
|
|
@ -83,11 +83,11 @@ Q_SIGNALS:
|
||||||
|
|
||||||
public Q_SLOTS:
|
public Q_SLOTS:
|
||||||
// AMQP Exchange
|
// AMQP Exchange
|
||||||
void declare(ExchangeType type = Direct,
|
void declare(QAmqpExchange::ExchangeType type = Direct,
|
||||||
ExchangeOptions options = NoOptions,
|
QAmqpExchange::ExchangeOptions options = NoOptions,
|
||||||
const QAmqpTable &args = QAmqpTable());
|
const QAmqpTable &args = QAmqpTable());
|
||||||
void declare(const QString &type,
|
void declare(const QString &type,
|
||||||
ExchangeOptions options = NoOptions,
|
QAmqpExchange::ExchangeOptions options = NoOptions,
|
||||||
const QAmqpTable &args = QAmqpTable());
|
const QAmqpTable &args = QAmqpTable());
|
||||||
void remove(int options = roIfUnused|roNoWait);
|
void remove(int options = roIfUnused|roNoWait);
|
||||||
|
|
||||||
|
|
|
||||||
|
|
@ -1,6 +1,7 @@
|
||||||
#include <QDateTime>
|
#include <QDateTime>
|
||||||
#include <QList>
|
|
||||||
#include <QDebug>
|
#include <QDebug>
|
||||||
|
#include <QIODevice>
|
||||||
|
#include <QList>
|
||||||
|
|
||||||
#include "qamqptable.h"
|
#include "qamqptable.h"
|
||||||
#include "qamqpglobal.h"
|
#include "qamqpglobal.h"
|
||||||
|
|
@ -73,7 +74,13 @@ QDataStream &operator<<(QDataStream &stream, const QAmqpFrame &frame)
|
||||||
|
|
||||||
// write end
|
// write end
|
||||||
stream << qint8(QAmqpFrame::FRAME_END);
|
stream << qint8(QAmqpFrame::FRAME_END);
|
||||||
stream.device()->waitForBytesWritten(QAmqpFrame::writeTimeout());
|
|
||||||
|
int writeTimeout = QAmqpFrame::writeTimeout();
|
||||||
|
if(writeTimeout >= -1)
|
||||||
|
{
|
||||||
|
stream.device()->waitForBytesWritten(writeTimeout);
|
||||||
|
}
|
||||||
|
|
||||||
return stream;
|
return stream;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
@ -202,7 +209,11 @@ QVariant QAmqpFrame::readAmqpField(QDataStream &s, QAmqpMetaType::ValueType type
|
||||||
{
|
{
|
||||||
qulonglong tmp_value;
|
qulonglong tmp_value;
|
||||||
s >> tmp_value;
|
s >> tmp_value;
|
||||||
|
#if (QT_VERSION >= QT_VERSION_CHECK(5, 8, 0))
|
||||||
|
return QDateTime::fromSecsSinceEpoch(tmp_value);
|
||||||
|
#else
|
||||||
return QDateTime::fromTime_t(tmp_value);
|
return QDateTime::fromTime_t(tmp_value);
|
||||||
|
#endif
|
||||||
}
|
}
|
||||||
case QAmqpMetaType::Hash:
|
case QAmqpMetaType::Hash:
|
||||||
{
|
{
|
||||||
|
|
@ -256,7 +267,11 @@ void QAmqpFrame::writeAmqpField(QDataStream &s, QAmqpMetaType::ValueType type, c
|
||||||
}
|
}
|
||||||
break;
|
break;
|
||||||
case QAmqpMetaType::Timestamp:
|
case QAmqpMetaType::Timestamp:
|
||||||
|
#if (QT_VERSION >= QT_VERSION_CHECK(5, 8, 0))
|
||||||
|
s << qulonglong(value.toDateTime().toSecsSinceEpoch());
|
||||||
|
#else
|
||||||
s << qulonglong(value.toDateTime().toTime_t());
|
s << qulonglong(value.toDateTime().toTime_t());
|
||||||
|
#endif
|
||||||
break;
|
break;
|
||||||
case QAmqpMetaType::Hash:
|
case QAmqpMetaType::Hash:
|
||||||
{
|
{
|
||||||
|
|
|
||||||
|
|
@ -47,7 +47,7 @@
|
||||||
#define AMQP_BASIC_APP_ID_FLAG (1 << 3)
|
#define AMQP_BASIC_APP_ID_FLAG (1 << 3)
|
||||||
#define AMQP_BASIC_CLUSTER_ID_FLAG (1 << 2)
|
#define AMQP_BASIC_CLUSTER_ID_FLAG (1 << 2)
|
||||||
|
|
||||||
#define QAMQP_VERSION "0.5.0"
|
#define QAMQP_VERSION "0.6.0"
|
||||||
|
|
||||||
#define AMQP_CONNECTION_FORCED 320
|
#define AMQP_CONNECTION_FORCED 320
|
||||||
|
|
||||||
|
|
@ -61,7 +61,7 @@
|
||||||
# define QAMQP_EXPORT
|
# define QAMQP_EXPORT
|
||||||
#endif
|
#endif
|
||||||
|
|
||||||
#define qAmqpDebug if (qgetenv("QAMQP_DEBUG").isEmpty()); else qDebug
|
#define qAmqpDebug if (qEnvironmentVariableIsEmpty("QAMQP_DEBUG")); else qDebug
|
||||||
|
|
||||||
namespace QAmqpMetaType {
|
namespace QAmqpMetaType {
|
||||||
|
|
||||||
|
|
|
||||||
|
|
@ -113,13 +113,6 @@ QHash<QString, QVariant> QAmqpMessage::headers() const
|
||||||
return d->headers;
|
return d->headers;
|
||||||
}
|
}
|
||||||
|
|
||||||
#if QT_VERSION < 0x050000
|
|
||||||
bool QAmqpMessage::isDetached() const
|
|
||||||
{
|
|
||||||
return d && d->ref == 1;
|
|
||||||
}
|
|
||||||
#endif
|
|
||||||
|
|
||||||
uint qHash(const QAmqpMessage &message, uint seed)
|
uint qHash(const QAmqpMessage &message, uint seed)
|
||||||
{
|
{
|
||||||
Q_UNUSED(seed);
|
Q_UNUSED(seed);
|
||||||
|
|
|
||||||
|
|
@ -34,9 +34,7 @@ public:
|
||||||
QAmqpMessage &operator=(const QAmqpMessage &other);
|
QAmqpMessage &operator=(const QAmqpMessage &other);
|
||||||
~QAmqpMessage();
|
~QAmqpMessage();
|
||||||
|
|
||||||
#if QT_VERSION >= 0x050000
|
|
||||||
inline void swap(QAmqpMessage &other) { qSwap(d, other.d); }
|
inline void swap(QAmqpMessage &other) { qSwap(d, other.d); }
|
||||||
#endif
|
|
||||||
|
|
||||||
bool operator==(const QAmqpMessage &message) const;
|
bool operator==(const QAmqpMessage &message) const;
|
||||||
inline bool operator!=(const QAmqpMessage &message) const { return !(operator==(message)); }
|
inline bool operator!=(const QAmqpMessage &message) const { return !(operator==(message)); }
|
||||||
|
|
@ -80,22 +78,9 @@ private:
|
||||||
QSharedDataPointer<QAmqpMessagePrivate> d;
|
QSharedDataPointer<QAmqpMessagePrivate> d;
|
||||||
friend class QAmqpQueuePrivate;
|
friend class QAmqpQueuePrivate;
|
||||||
friend class QAmqpQueue;
|
friend class QAmqpQueue;
|
||||||
|
|
||||||
#if QT_VERSION < 0x050000
|
|
||||||
public:
|
|
||||||
typedef QSharedDataPointer<QAmqpMessagePrivate> DataPtr;
|
|
||||||
inline DataPtr &data_ptr() { return d; }
|
|
||||||
|
|
||||||
// internal
|
|
||||||
bool isDetached() const;
|
|
||||||
#endif
|
|
||||||
};
|
};
|
||||||
|
|
||||||
Q_DECLARE_METATYPE(QAmqpMessage::PropertyHash)
|
Q_DECLARE_METATYPE(QAmqpMessage::PropertyHash)
|
||||||
|
|
||||||
#if QT_VERSION < 0x050000
|
|
||||||
Q_DECLARE_TYPEINFO(QAmqpMessage, Q_MOVABLE_TYPE);
|
|
||||||
#endif
|
|
||||||
Q_DECLARE_SHARED(QAmqpMessage)
|
Q_DECLARE_SHARED(QAmqpMessage)
|
||||||
|
|
||||||
// NOTE: needed only for MSVC support, don't depend on this hash
|
// NOTE: needed only for MSVC support, don't depend on this hash
|
||||||
|
|
|
||||||
|
|
@ -611,7 +611,7 @@ void QAmqpQueue::ack(qlonglong deliveryTag, bool multiple)
|
||||||
|
|
||||||
void QAmqpQueue::reject(const QAmqpMessage &message, bool requeue)
|
void QAmqpQueue::reject(const QAmqpMessage &message, bool requeue)
|
||||||
{
|
{
|
||||||
ack(message.deliveryTag(), requeue);
|
reject(message.deliveryTag(), requeue);
|
||||||
}
|
}
|
||||||
|
|
||||||
void QAmqpQueue::reject(qlonglong deliveryTag, bool requeue)
|
void QAmqpQueue::reject(qlonglong deliveryTag, bool requeue)
|
||||||
|
|
|
||||||
|
|
@ -32,12 +32,8 @@ class QAmqpQueuePrivate;
|
||||||
class QAMQP_EXPORT QAmqpQueue : public QAmqpChannel, public QQueue<QAmqpMessage>
|
class QAMQP_EXPORT QAmqpQueue : public QAmqpChannel, public QQueue<QAmqpMessage>
|
||||||
{
|
{
|
||||||
Q_OBJECT
|
Q_OBJECT
|
||||||
Q_ENUMS(QueueOptions)
|
|
||||||
Q_PROPERTY(int options READ options CONSTANT)
|
Q_PROPERTY(int options READ options CONSTANT)
|
||||||
Q_PROPERTY(QString consumerTag READ consumerTag WRITE setConsumerTag)
|
Q_PROPERTY(QString consumerTag READ consumerTag WRITE setConsumerTag)
|
||||||
Q_ENUMS(QueueOption)
|
|
||||||
Q_ENUMS(ConsumeOption)
|
|
||||||
Q_ENUMS(RemoveOption)
|
|
||||||
|
|
||||||
public:
|
public:
|
||||||
enum QueueOption {
|
enum QueueOption {
|
||||||
|
|
@ -49,6 +45,8 @@ public:
|
||||||
NoWait = 0x10
|
NoWait = 0x10
|
||||||
};
|
};
|
||||||
Q_DECLARE_FLAGS(QueueOptions, QueueOption)
|
Q_DECLARE_FLAGS(QueueOptions, QueueOption)
|
||||||
|
Q_ENUM(QueueOption)
|
||||||
|
Q_ENUM(QueueOptions)
|
||||||
int options() const;
|
int options() const;
|
||||||
|
|
||||||
enum ConsumeOption {
|
enum ConsumeOption {
|
||||||
|
|
@ -58,6 +56,7 @@ public:
|
||||||
coNoWait = 0x08
|
coNoWait = 0x08
|
||||||
};
|
};
|
||||||
Q_DECLARE_FLAGS(ConsumeOptions, ConsumeOption)
|
Q_DECLARE_FLAGS(ConsumeOptions, ConsumeOption)
|
||||||
|
Q_ENUM(ConsumeOption)
|
||||||
|
|
||||||
enum RemoveOption {
|
enum RemoveOption {
|
||||||
roForce = 0x0,
|
roForce = 0x0,
|
||||||
|
|
@ -66,6 +65,7 @@ public:
|
||||||
roNoWait = 0x04
|
roNoWait = 0x04
|
||||||
};
|
};
|
||||||
Q_DECLARE_FLAGS(RemoveOptions, RemoveOption)
|
Q_DECLARE_FLAGS(RemoveOptions, RemoveOption)
|
||||||
|
Q_ENUM(RemoveOption)
|
||||||
|
|
||||||
~QAmqpQueue();
|
~QAmqpQueue();
|
||||||
|
|
||||||
|
|
|
||||||
|
|
@ -2,6 +2,7 @@
|
||||||
|
|
||||||
#include <QDateTime>
|
#include <QDateTime>
|
||||||
#include <QDebug>
|
#include <QDebug>
|
||||||
|
#include <QIODevice>
|
||||||
|
|
||||||
#include "qamqpframe_p.h"
|
#include "qamqpframe_p.h"
|
||||||
#include "qamqptable.h"
|
#include "qamqptable.h"
|
||||||
|
|
|
||||||
16
src/src.pro
16
src/src.pro
|
|
@ -64,16 +64,12 @@ HEADERS += \
|
||||||
$${INSTALL_HEADERS} \
|
$${INSTALL_HEADERS} \
|
||||||
$${PRIVATE_HEADERS}
|
$${PRIVATE_HEADERS}
|
||||||
|
|
||||||
SOURCES += \
|
SOURCES += $$files($$PWD/*.cpp)
|
||||||
qamqpauthenticator.cpp \
|
|
||||||
qamqpchannel.cpp \
|
DISTFILES += \
|
||||||
qamqpchannelhash.cpp \
|
../.travis.yml \
|
||||||
qamqpclient.cpp \
|
../LICENSE \
|
||||||
qamqpexchange.cpp \
|
../README.md
|
||||||
qamqpframe.cpp \
|
|
||||||
qamqpmessage.cpp \
|
|
||||||
qamqpqueue.cpp \
|
|
||||||
qamqptable.cpp
|
|
||||||
|
|
||||||
# install
|
# install
|
||||||
headers.files = $${INSTALL_HEADERS}
|
headers.files = $${INSTALL_HEADERS}
|
||||||
|
|
|
||||||
|
|
@ -179,7 +179,7 @@ void tst_QAMQPClient::tune()
|
||||||
|
|
||||||
client.connectToHost();
|
client.connectToHost();
|
||||||
QVERIFY(waitForSignal(&client, SIGNAL(connected())));
|
QVERIFY(waitForSignal(&client, SIGNAL(connected())));
|
||||||
QCOMPARE((int)client.channelMax(), 15);
|
QCOMPARE((int) client.channelMax(), 2047);
|
||||||
QCOMPARE((int)client.heartbeatDelay(), 600);
|
QCOMPARE((int)client.heartbeatDelay(), 600);
|
||||||
QCOMPARE((int)client.frameMax(), 5000);
|
QCOMPARE((int)client.frameMax(), 5000);
|
||||||
|
|
||||||
|
|
@ -191,7 +191,7 @@ void tst_QAMQPClient::socketError()
|
||||||
{
|
{
|
||||||
QAmqpClient client;
|
QAmqpClient client;
|
||||||
client.connectToHost("amqp://127.0.0.1:56725/");
|
client.connectToHost("amqp://127.0.0.1:56725/");
|
||||||
QVERIFY(waitForSignal(&client, SIGNAL(socketError(QAbstractSocket::SocketError))));
|
QVERIFY(waitForSignal(&client, SIGNAL(socketErrorOccurred(QAbstractSocket::SocketError))));
|
||||||
QCOMPARE(client.socketError(), QAbstractSocket::ConnectionRefusedError);
|
QCOMPARE(client.socketError(), QAbstractSocket::ConnectionRefusedError);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
@ -208,10 +208,8 @@ void tst_QAMQPClient::validateUri_data()
|
||||||
<< "guest" << "guest" << "192.168.1.10" << quint16(5672) << "/";
|
<< "guest" << "guest" << "192.168.1.10" << quint16(5672) << "/";
|
||||||
QTest::newRow("standard") << "amqp://user:pass@host:10000/vhost"
|
QTest::newRow("standard") << "amqp://user:pass@host:10000/vhost"
|
||||||
<< "user" << "pass" << "host" << quint16(10000) << "vhost";
|
<< "user" << "pass" << "host" << quint16(10000) << "vhost";
|
||||||
#if QT_VERSION >= 0x040806
|
|
||||||
QTest::newRow("urlencoded") << "amqp://user%61:%61pass@ho%61st:10000/v%2fhost"
|
QTest::newRow("urlencoded") << "amqp://user%61:%61pass@ho%61st:10000/v%2fhost"
|
||||||
<< "usera" << "apass" << "hoast" << quint16(10000) << "v/host";
|
<< "usera" << "apass" << "hoast" << quint16(10000) << "v/host";
|
||||||
#endif
|
|
||||||
QTest::newRow("empty") << "amqp://" << "" << "" << "" << quint16(AMQP_PORT) << "";
|
QTest::newRow("empty") << "amqp://" << "" << "" << "" << quint16(AMQP_PORT) << "";
|
||||||
QTest::newRow("empty2") << "amqp://:@/" << "" << "" << "" << quint16(AMQP_PORT) << "/";
|
QTest::newRow("empty2") << "amqp://:@/" << "" << "" << "" << quint16(AMQP_PORT) << "/";
|
||||||
QTest::newRow("onlyuser") << "amqp://user@" << "user" << "" << "" << quint16(AMQP_PORT) << "";
|
QTest::newRow("onlyuser") << "amqp://user@" << "user" << "" << "" << quint16(AMQP_PORT) << "";
|
||||||
|
|
|
||||||
|
|
@ -356,7 +356,7 @@ void tst_QAMQPQueue::canOnlyStartConsumingOnce()
|
||||||
void tst_QAMQPQueue::ensureConsumeOnlySentOnce()
|
void tst_QAMQPQueue::ensureConsumeOnlySentOnce()
|
||||||
{
|
{
|
||||||
QAmqpQueue *queue = client->createQueue("test-single-consumer");
|
QAmqpQueue *queue = client->createQueue("test-single-consumer");
|
||||||
SignalSpy spy(queue, SIGNAL(consuming(QString)));
|
QSignalSpy spy(queue, SIGNAL(consuming(QString)));
|
||||||
queue->declare();
|
queue->declare();
|
||||||
QVERIFY(waitForSignal(queue, SIGNAL(declared())));
|
QVERIFY(waitForSignal(queue, SIGNAL(declared())));
|
||||||
|
|
||||||
|
|
@ -601,8 +601,13 @@ void tst_QAMQPQueue::tableFieldDataTypes()
|
||||||
QCOMPARE(message.header("double").toDouble(), double(FLT_MAX));
|
QCOMPARE(message.header("double").toDouble(), double(FLT_MAX));
|
||||||
QCOMPARE(message.header("short-string").toString(), QLatin1String("test"));
|
QCOMPARE(message.header("short-string").toString(), QLatin1String("test"));
|
||||||
QCOMPARE(message.header("long-string").toString(), QLatin1String("test"));
|
QCOMPARE(message.header("long-string").toString(), QLatin1String("test"));
|
||||||
QCOMPARE(message.header("timestamp").toDateTime().toTime_t(), timestamp.toTime_t());
|
|
||||||
QCOMPARE(message.header("bytes").toByteArray(), QByteArray("abcdefg1234567"));
|
QCOMPARE(message.header("bytes").toByteArray(), QByteArray("abcdefg1234567"));
|
||||||
|
#if (QT_VERSION >= QT_VERSION_CHECK(5, 8, 0))
|
||||||
|
QCOMPARE(message.header("timestamp").toDateTime().toSecsSinceEpoch(),
|
||||||
|
timestamp.toSecsSinceEpoch());
|
||||||
|
#else
|
||||||
|
QCOMPARE(message.header("timestamp").toDateTime().toTime_t(), timestamp.toTime_t());
|
||||||
|
#endif
|
||||||
|
|
||||||
QVERIFY(message.hasHeader("nested-table"));
|
QVERIFY(message.hasHeader("nested-table"));
|
||||||
QAmqpTable compareTable(message.header("nested-table").toHash());
|
QAmqpTable compareTable(message.header("nested-table").toHash());
|
||||||
|
|
@ -654,11 +659,17 @@ void tst_QAMQPQueue::messageProperties()
|
||||||
QCOMPARE(message.property(QAmqpMessage::ReplyTo).toString(), QLatin1String("another-queue"));
|
QCOMPARE(message.property(QAmqpMessage::ReplyTo).toString(), QLatin1String("another-queue"));
|
||||||
QCOMPARE(message.property(QAmqpMessage::MessageId).toString(), QLatin1String("some-message-id"));
|
QCOMPARE(message.property(QAmqpMessage::MessageId).toString(), QLatin1String("some-message-id"));
|
||||||
QCOMPARE(message.property(QAmqpMessage::Expiration).toString(), QLatin1String("60000"));
|
QCOMPARE(message.property(QAmqpMessage::Expiration).toString(), QLatin1String("60000"));
|
||||||
QCOMPARE(message.property(QAmqpMessage::Timestamp).toDateTime().toTime_t(), timestamp.toTime_t());
|
|
||||||
QCOMPARE(message.property(QAmqpMessage::Type).toString(), QLatin1String("some-message-type"));
|
QCOMPARE(message.property(QAmqpMessage::Type).toString(), QLatin1String("some-message-type"));
|
||||||
QCOMPARE(message.property(QAmqpMessage::UserId).toString(), QLatin1String("guest"));
|
QCOMPARE(message.property(QAmqpMessage::UserId).toString(), QLatin1String("guest"));
|
||||||
QCOMPARE(message.property(QAmqpMessage::AppId).toString(), QLatin1String("some-app-id"));
|
QCOMPARE(message.property(QAmqpMessage::AppId).toString(), QLatin1String("some-app-id"));
|
||||||
QCOMPARE(message.property(QAmqpMessage::ClusterID).toString(), QLatin1String("some-cluster-id"));
|
QCOMPARE(message.property(QAmqpMessage::ClusterID).toString(), QLatin1String("some-cluster-id"));
|
||||||
|
#if (QT_VERSION >= QT_VERSION_CHECK(5, 8, 0))
|
||||||
|
QCOMPARE(message.property(QAmqpMessage::Timestamp).toDateTime().toSecsSinceEpoch(),
|
||||||
|
timestamp.toSecsSinceEpoch());
|
||||||
|
#else
|
||||||
|
QCOMPARE(message.property(QAmqpMessage::Timestamp).toDateTime().toTime_t(),
|
||||||
|
timestamp.toTime_t());
|
||||||
|
#endif
|
||||||
}
|
}
|
||||||
|
|
||||||
void tst_QAMQPQueue::emptyMessage()
|
void tst_QAMQPQueue::emptyMessage()
|
||||||
|
|
|
||||||
|
|
@ -17,127 +17,4 @@ bool waitForSignal(QObject *obj, const char *signal, int delay)
|
||||||
|
|
||||||
} // namespace Test
|
} // namespace Test
|
||||||
} // namespace QAMQP
|
} // namespace QAMQP
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
#if QT_VERSION >= 0x050000
|
|
||||||
typedef QSignalSpy SignalSpy;
|
|
||||||
#else
|
|
||||||
#include <QtCore/qbytearray.h>
|
|
||||||
#include <QtCore/qlist.h>
|
|
||||||
#include <QtCore/qobject.h>
|
|
||||||
#include <QtCore/qmetaobject.h>
|
|
||||||
#include <QtCore/qvariant.h>
|
|
||||||
#include <QtCore/qvector.h>
|
|
||||||
#include <QtTest/qtesteventloop.h>
|
|
||||||
|
|
||||||
class QVariant;
|
|
||||||
class SignalSpy: public QObject, public QList<QList<QVariant> >
|
|
||||||
{
|
|
||||||
public:
|
|
||||||
explicit SignalSpy(const QObject *obj, const char *aSignal)
|
|
||||||
: m_waiting(false)
|
|
||||||
{
|
|
||||||
static const int memberOffset = QObject::staticMetaObject.methodCount();
|
|
||||||
if (!obj) {
|
|
||||||
qWarning("SignalSpy: Cannot spy on a null object");
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
|
|
||||||
if (!aSignal) {
|
|
||||||
qWarning("SignalSpy: Null signal name is not valid");
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
|
|
||||||
if (((aSignal[0] - '0') & 0x03) != QSIGNAL_CODE) {
|
|
||||||
qWarning("SignalSpy: Not a valid signal, use the SIGNAL macro");
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
|
|
||||||
const QByteArray ba = QMetaObject::normalizedSignature(aSignal + 1);
|
|
||||||
const QMetaObject * const mo = obj->metaObject();
|
|
||||||
const int sigIndex = mo->indexOfMethod(ba.constData());
|
|
||||||
if (sigIndex < 0) {
|
|
||||||
qWarning("SignalSpy: No such signal: '%s'", ba.constData());
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
|
|
||||||
if (!QMetaObject::connect(obj, sigIndex, this, memberOffset,
|
|
||||||
Qt::DirectConnection, 0)) {
|
|
||||||
qWarning("SignalSpy: QMetaObject::connect returned false. Unable to connect.");
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
sig = ba;
|
|
||||||
initArgs(mo->method(sigIndex), obj);
|
|
||||||
}
|
|
||||||
|
|
||||||
inline bool isValid() const { return !sig.isEmpty(); }
|
|
||||||
inline QByteArray signal() const { return sig; }
|
|
||||||
|
|
||||||
bool wait(int timeout = 5)
|
|
||||||
{
|
|
||||||
Q_ASSERT(!m_waiting);
|
|
||||||
const int origCount = count();
|
|
||||||
m_waiting = true;
|
|
||||||
m_loop.enterLoop(timeout);
|
|
||||||
m_waiting = false;
|
|
||||||
return count() > origCount;
|
|
||||||
}
|
|
||||||
|
|
||||||
int qt_metacall(QMetaObject::Call call, int methodId, void **a)
|
|
||||||
{
|
|
||||||
methodId = QObject::qt_metacall(call, methodId, a);
|
|
||||||
if (methodId < 0)
|
|
||||||
return methodId;
|
|
||||||
|
|
||||||
if (call == QMetaObject::InvokeMetaMethod) {
|
|
||||||
if (methodId == 0) {
|
|
||||||
appendArgs(a);
|
|
||||||
}
|
|
||||||
--methodId;
|
|
||||||
}
|
|
||||||
return methodId;
|
|
||||||
}
|
|
||||||
|
|
||||||
private:
|
|
||||||
void initArgs(const QMetaMethod &member)
|
|
||||||
{
|
|
||||||
initArgs(member, 0);
|
|
||||||
}
|
|
||||||
|
|
||||||
void initArgs(const QMetaMethod &member, const QObject *obj)
|
|
||||||
{
|
|
||||||
Q_UNUSED(obj)
|
|
||||||
QList<QByteArray> params = member.parameterTypes();
|
|
||||||
for (int i = 0; i < params.count(); ++i) {
|
|
||||||
int tp = QMetaType::type(params.at(i).constData());
|
|
||||||
if (tp == QMetaType::Void)
|
|
||||||
qWarning("Don't know how to handle '%s', use qRegisterMetaType to register it.",
|
|
||||||
params.at(i).constData());
|
|
||||||
args << tp;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
void appendArgs(void **a)
|
|
||||||
{
|
|
||||||
QList<QVariant> list;
|
|
||||||
for (int i = 0; i < args.count(); ++i) {
|
|
||||||
QMetaType::Type type = static_cast<QMetaType::Type>(args.at(i));
|
|
||||||
list << QVariant(type, a[i + 1]);
|
|
||||||
}
|
|
||||||
append(list);
|
|
||||||
|
|
||||||
if (m_waiting)
|
|
||||||
m_loop.exitLoop();
|
|
||||||
}
|
|
||||||
|
|
||||||
// the full, normalized signal name
|
|
||||||
QByteArray sig;
|
|
||||||
// holds the QMetaType types for the argument list of the signal
|
|
||||||
QVector<int> args;
|
|
||||||
|
|
||||||
QTestEventLoop m_loop;
|
|
||||||
bool m_waiting;
|
|
||||||
};
|
|
||||||
#endif
|
|
||||||
#endif
|
#endif
|
||||||
|
|
|
||||||
Loading…
Reference in New Issue