[+] Authenticator - authentication mechanism abstraction
[+] AMQPLAIN authentication Authenticator implement
This commit is contained in:
parent
9874e0af26
commit
267c9801f0
|
|
@ -166,6 +166,10 @@
|
||||||
RelativePath=".\src\qamqp\amqp.cpp"
|
RelativePath=".\src\qamqp\amqp.cpp"
|
||||||
>
|
>
|
||||||
</File>
|
</File>
|
||||||
|
<File
|
||||||
|
RelativePath=".\src\qamqp\amqp_authenticator.cpp"
|
||||||
|
>
|
||||||
|
</File>
|
||||||
<File
|
<File
|
||||||
RelativePath=".\src\qamqp\amqp_channel.cpp"
|
RelativePath=".\src\qamqp\amqp_channel.cpp"
|
||||||
>
|
>
|
||||||
|
|
@ -230,6 +234,10 @@
|
||||||
/>
|
/>
|
||||||
</FileConfiguration>
|
</FileConfiguration>
|
||||||
</File>
|
</File>
|
||||||
|
<File
|
||||||
|
RelativePath=".\src\qamqp\amqp_authenticator.h"
|
||||||
|
>
|
||||||
|
</File>
|
||||||
<File
|
<File
|
||||||
RelativePath=".\src\qamqp\amqp_channel.h"
|
RelativePath=".\src\qamqp\amqp_channel.h"
|
||||||
>
|
>
|
||||||
|
|
|
||||||
|
|
@ -6,6 +6,7 @@
|
||||||
#include "qamqp_global.h"
|
#include "qamqp_global.h"
|
||||||
#include "amqp_exchange.h"
|
#include "amqp_exchange.h"
|
||||||
#include "amqp_queue.h"
|
#include "amqp_queue.h"
|
||||||
|
#include "amqp_authenticator.h"
|
||||||
|
|
||||||
using namespace QAMQP;
|
using namespace QAMQP;
|
||||||
|
|
||||||
|
|
@ -31,8 +32,6 @@ ClientPrivate::ClientPrivate(int version )
|
||||||
, port(AMQPPORT)
|
, port(AMQPPORT)
|
||||||
, host(QString::fromLatin1(AMQPHOST))
|
, host(QString::fromLatin1(AMQPHOST))
|
||||||
, virtualHost(QString::fromLatin1(AMQPVHOST))
|
, virtualHost(QString::fromLatin1(AMQPVHOST))
|
||||||
, user(QString::fromLatin1(AMQPLOGIN))
|
|
||||||
, password(QString::fromLatin1(AMQPPSWD))
|
|
||||||
{
|
{
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
@ -55,6 +54,8 @@ void ClientPrivate::init(QObject * parent)
|
||||||
connection_ = new QAMQP::Connection(q_func());
|
connection_ = new QAMQP::Connection(q_func());
|
||||||
}
|
}
|
||||||
|
|
||||||
|
setAuth(new AMQPlainAuthenticator(QString::fromLatin1(AMQPLOGIN), QString::fromLatin1(AMQPPSWD)));
|
||||||
|
|
||||||
QObject::connect(network_, SIGNAL(method(const QAMQP::Frame::Method &)),
|
QObject::connect(network_, SIGNAL(method(const QAMQP::Frame::Method &)),
|
||||||
connection_, SLOT(_q_method(const QAMQP::Frame::Method &)));
|
connection_, SLOT(_q_method(const QAMQP::Frame::Method &)));
|
||||||
}
|
}
|
||||||
|
|
@ -66,14 +67,27 @@ void ClientPrivate::init(QObject * parent, const QUrl & con)
|
||||||
ClientPrivate::connect();
|
ClientPrivate::connect();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
void ClientPrivate::setAuth( Authenticator* auth )
|
||||||
|
{
|
||||||
|
auth_ = QSharedPointer<Authenticator>(auth);
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
void ClientPrivate::printConnect() const
|
void ClientPrivate::printConnect() const
|
||||||
{
|
{
|
||||||
QTextStream stream(stdout);
|
QTextStream stream(stdout);
|
||||||
stream << "port = " << port << endl;
|
stream << "port = " << port << endl;
|
||||||
stream << "host = " << host << endl;
|
stream << "host = " << host << endl;
|
||||||
stream << "vhost = " << virtualHost << endl;
|
stream << "vhost = " << virtualHost << endl;
|
||||||
stream << "user = " << user << endl;
|
|
||||||
stream << "passw = " << password << endl;
|
if(auth_ && auth_->type() == "AMQPLAIN")
|
||||||
|
{
|
||||||
|
QSharedPointer<AMQPlainAuthenticator> a = auth_.staticCast<AMQPlainAuthenticator>();
|
||||||
|
stream << "user = " << a->login() << endl;
|
||||||
|
stream << "passw = " << a->password() << endl;
|
||||||
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
void ClientPrivate::connect()
|
void ClientPrivate::connect()
|
||||||
|
|
@ -221,22 +235,48 @@ void QAMQP::Client::setVirtualHost( const QString & virtualHost )
|
||||||
|
|
||||||
QString QAMQP::Client::user() const
|
QString QAMQP::Client::user() const
|
||||||
{
|
{
|
||||||
return d_func()->user;
|
const Authenticator * auth = d_func()->auth_.data();
|
||||||
|
|
||||||
|
if(auth && auth->type() == "AMQPLAIN")
|
||||||
|
{
|
||||||
|
const AMQPlainAuthenticator * a = static_cast<const AMQPlainAuthenticator *>(auth);
|
||||||
|
return a->login();
|
||||||
|
}
|
||||||
|
return QString();
|
||||||
}
|
}
|
||||||
|
|
||||||
void QAMQP::Client::setUser( const QString & user )
|
void QAMQP::Client::setUser( const QString & user )
|
||||||
{
|
{
|
||||||
d_func()->user = user;
|
Authenticator * auth = d_func()->auth_.data();
|
||||||
|
|
||||||
|
if(auth && auth->type() == "AMQPLAIN")
|
||||||
|
{
|
||||||
|
AMQPlainAuthenticator * a = static_cast<AMQPlainAuthenticator *>(auth);
|
||||||
|
a->setLogin(user);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
QString QAMQP::Client::password() const
|
QString QAMQP::Client::password() const
|
||||||
{
|
{
|
||||||
return d_func()->password;
|
const Authenticator * auth = d_func()->auth_.data();
|
||||||
|
|
||||||
|
if(auth && auth->type() == "AMQPLAIN")
|
||||||
|
{
|
||||||
|
const AMQPlainAuthenticator * a = static_cast<const AMQPlainAuthenticator *>(auth);
|
||||||
|
return a->password();
|
||||||
|
}
|
||||||
|
return QString();
|
||||||
}
|
}
|
||||||
|
|
||||||
void QAMQP::Client::setPassword( const QString & password )
|
void QAMQP::Client::setPassword( const QString & password )
|
||||||
{
|
{
|
||||||
d_func()->password = password;
|
Authenticator * auth = d_func()->auth_.data();
|
||||||
|
|
||||||
|
if(auth && auth->type() == "AMQPLAIN")
|
||||||
|
{
|
||||||
|
AMQPlainAuthenticator * a = static_cast<AMQPlainAuthenticator *>(auth);
|
||||||
|
a->setPassword(password);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
void QAMQP::Client::printConnect() const
|
void QAMQP::Client::printConnect() const
|
||||||
|
|
@ -292,3 +332,13 @@ void QAMQP::Client::reopen()
|
||||||
return d_func()->connect();
|
return d_func()->connect();
|
||||||
return d_func()->disconnect();
|
return d_func()->disconnect();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
void QAMQP::Client::setAuth( Authenticator * auth )
|
||||||
|
{
|
||||||
|
d_func()->setAuth(auth);
|
||||||
|
}
|
||||||
|
|
||||||
|
Authenticator * QAMQP::Client::auth() const
|
||||||
|
{
|
||||||
|
return d_func()->auth_.data();
|
||||||
|
}
|
||||||
|
|
@ -9,6 +9,7 @@ namespace QAMQP
|
||||||
class Exchange;
|
class Exchange;
|
||||||
class Queue;
|
class Queue;
|
||||||
class ClientPrivate;
|
class ClientPrivate;
|
||||||
|
class Authenticator;
|
||||||
class ConnectionPrivate;
|
class ConnectionPrivate;
|
||||||
class Client : public QObject
|
class Client : public QObject
|
||||||
{
|
{
|
||||||
|
|
@ -54,6 +55,8 @@ namespace QAMQP
|
||||||
QString password() const;
|
QString password() const;
|
||||||
void setPassword(const QString & password);
|
void setPassword(const QString & password);
|
||||||
|
|
||||||
|
void setAuth(Authenticator * auth);
|
||||||
|
Authenticator * auth() const;
|
||||||
void open();
|
void open();
|
||||||
void open(const QUrl & connectionString);
|
void open(const QUrl & connectionString);
|
||||||
void close();
|
void close();
|
||||||
|
|
|
||||||
|
|
@ -0,0 +1,47 @@
|
||||||
|
#include "amqp_authenticator.h"
|
||||||
|
#include "amqp_frame.h"
|
||||||
|
|
||||||
|
QString QAMQP::AMQPlainAuthenticator::login() const
|
||||||
|
{
|
||||||
|
return login_;
|
||||||
|
}
|
||||||
|
|
||||||
|
QString QAMQP::AMQPlainAuthenticator::password() const
|
||||||
|
{
|
||||||
|
return password_;
|
||||||
|
}
|
||||||
|
|
||||||
|
QAMQP::AMQPlainAuthenticator::AMQPlainAuthenticator( const QString & l /*= QString()*/, const QString & p /*= QString()*/ )
|
||||||
|
{
|
||||||
|
login_ = l;
|
||||||
|
password_ = p;
|
||||||
|
}
|
||||||
|
|
||||||
|
QAMQP::AMQPlainAuthenticator::~AMQPlainAuthenticator()
|
||||||
|
{
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
|
QString QAMQP::AMQPlainAuthenticator::type() const
|
||||||
|
{
|
||||||
|
return "AMQPLAIN";
|
||||||
|
}
|
||||||
|
|
||||||
|
void QAMQP::AMQPlainAuthenticator::setLogin( const QString& l )
|
||||||
|
{
|
||||||
|
login_ = l;
|
||||||
|
}
|
||||||
|
|
||||||
|
void QAMQP::AMQPlainAuthenticator::setPassword( const QString &p )
|
||||||
|
{
|
||||||
|
password_ = p;
|
||||||
|
}
|
||||||
|
|
||||||
|
void QAMQP::AMQPlainAuthenticator::write( QDataStream & out )
|
||||||
|
{
|
||||||
|
QAMQP::Frame::writeField('s', out, type());
|
||||||
|
QAMQP::Frame::TableField response;
|
||||||
|
response["LOGIN"] = login_;
|
||||||
|
response["PASSWORD"] = password_;
|
||||||
|
QAMQP::Frame::serialize(out, response);
|
||||||
|
}
|
||||||
|
|
@ -0,0 +1,33 @@
|
||||||
|
#ifndef amqp_authenticator_h__
|
||||||
|
#define amqp_authenticator_h__
|
||||||
|
|
||||||
|
#include "qamqp_global.h"
|
||||||
|
#include <QString>
|
||||||
|
#include <QDataStream>
|
||||||
|
|
||||||
|
namespace QAMQP
|
||||||
|
{
|
||||||
|
class Authenticator
|
||||||
|
{
|
||||||
|
public:
|
||||||
|
virtual ~Authenticator(){};
|
||||||
|
virtual QString type() const = 0;
|
||||||
|
virtual void write(QDataStream & out) = 0;
|
||||||
|
};
|
||||||
|
|
||||||
|
class AMQPlainAuthenticator : public Authenticator
|
||||||
|
{
|
||||||
|
QString login_, password_;
|
||||||
|
public:
|
||||||
|
AMQPlainAuthenticator(const QString & login = QString(), const QString & password = QString());
|
||||||
|
virtual ~AMQPlainAuthenticator();
|
||||||
|
QString login() const;
|
||||||
|
void setLogin(const QString& l);
|
||||||
|
QString password() const;
|
||||||
|
void setPassword(const QString &p);
|
||||||
|
virtual QString type() const;
|
||||||
|
virtual void write(QDataStream & out);
|
||||||
|
};
|
||||||
|
|
||||||
|
}
|
||||||
|
#endif // amqp_authenticator_h__
|
||||||
|
|
@ -159,11 +159,13 @@ void ChannelPrivate::init(int channelNumber, Client * parent)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
void ChannelPrivate::_q_method( const QAMQP::Frame::Method & frame )
|
bool ChannelPrivate::_q_method( const QAMQP::Frame::Method & frame )
|
||||||
{
|
{
|
||||||
if(frame.methodClass() != QAMQP::Frame::fcChannel
|
if(frame.channel() != number )
|
||||||
|| frame.channel() != number )
|
return true;
|
||||||
return;
|
|
||||||
|
if(frame.methodClass() != QAMQP::Frame::fcChannel)
|
||||||
|
return false;
|
||||||
|
|
||||||
qDebug("Channel#%d:", number);
|
qDebug("Channel#%d:", number);
|
||||||
|
|
||||||
|
|
@ -185,6 +187,7 @@ void ChannelPrivate::_q_method( const QAMQP::Frame::Method & frame )
|
||||||
closeOk(frame);
|
closeOk(frame);
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
|
return true;
|
||||||
}
|
}
|
||||||
|
|
||||||
void ChannelPrivate::_q_open()
|
void ChannelPrivate::_q_open()
|
||||||
|
|
|
||||||
|
|
@ -44,7 +44,7 @@ namespace QAMQP
|
||||||
};
|
};
|
||||||
|
|
||||||
ChannelPrivate(int version = QObjectPrivateVersion);
|
ChannelPrivate(int version = QObjectPrivateVersion);
|
||||||
~ChannelPrivate();
|
virtual ~ChannelPrivate();
|
||||||
|
|
||||||
void init(int channelNumber, Client * parent);
|
void init(int channelNumber, Client * parent);
|
||||||
|
|
||||||
|
|
@ -62,7 +62,7 @@ namespace QAMQP
|
||||||
void close(const QAMQP::Frame::Method & frame);
|
void close(const QAMQP::Frame::Method & frame);
|
||||||
void closeOk(const QAMQP::Frame::Method & frame);
|
void closeOk(const QAMQP::Frame::Method & frame);
|
||||||
|
|
||||||
virtual void _q_method(const QAMQP::Frame::Method & frame);
|
virtual bool _q_method(const QAMQP::Frame::Method & frame);
|
||||||
void _q_open();
|
void _q_open();
|
||||||
|
|
||||||
void setQOS(qint32 prefetchSize, quint16 prefetchCount);
|
void setQOS(qint32 prefetchSize, quint16 prefetchCount);
|
||||||
|
|
|
||||||
|
|
@ -57,16 +57,13 @@ void ConnectionPrivate::startOk()
|
||||||
QDataStream stream(&arguments_, QIODevice::WriteOnly);
|
QDataStream stream(&arguments_, QIODevice::WriteOnly);
|
||||||
|
|
||||||
QAMQP::Frame::TableField clientProperties;
|
QAMQP::Frame::TableField clientProperties;
|
||||||
clientProperties["version"] = "0.0.1";
|
clientProperties["version"] = "0.0.3";
|
||||||
clientProperties["platform"] = QString("Qt %1").arg(qVersion());
|
clientProperties["platform"] = QString("Qt %1").arg(qVersion());
|
||||||
clientProperties["product"] = "QAMQP";
|
clientProperties["product"] = "QAMQP";
|
||||||
QAMQP::Frame::serialize(stream, clientProperties);
|
QAMQP::Frame::serialize(stream, clientProperties);
|
||||||
|
|
||||||
QAMQP::Frame::writeField('s', stream, "AMQPLAIN");
|
client_->d_func()->auth_->write(stream);
|
||||||
QAMQP::Frame::TableField response;
|
|
||||||
response["LOGIN"] = client_->user();
|
|
||||||
response["PASSWORD"] = client_->password();
|
|
||||||
QAMQP::Frame::serialize(stream, response);
|
|
||||||
QAMQP::Frame::writeField('s', stream, "en_US");
|
QAMQP::Frame::writeField('s', stream, "en_US");
|
||||||
|
|
||||||
frame.setArguments(arguments_);
|
frame.setArguments(arguments_);
|
||||||
|
|
@ -234,10 +231,10 @@ void ConnectionPrivate::setQOS( qint32 prefetchSize, quint16 prefetchCount, int
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
void ConnectionPrivate::_q_method( const QAMQP::Frame::Method & frame )
|
bool ConnectionPrivate::_q_method( const QAMQP::Frame::Method & frame )
|
||||||
{
|
{
|
||||||
if(frame.methodClass() != QAMQP::Frame::fcConnection)
|
if(frame.methodClass() != QAMQP::Frame::fcConnection)
|
||||||
return;
|
return true;
|
||||||
|
|
||||||
qDebug() << "Connection:";
|
qDebug() << "Connection:";
|
||||||
|
|
||||||
|
|
@ -245,7 +242,7 @@ void ConnectionPrivate::_q_method( const QAMQP::Frame::Method & frame )
|
||||||
{
|
{
|
||||||
if( frame.id() == miCloseOk)
|
if( frame.id() == miCloseOk)
|
||||||
closeOk(frame);
|
closeOk(frame);
|
||||||
return;
|
return true;
|
||||||
}
|
}
|
||||||
|
|
||||||
switch(MethodId(frame.id()))
|
switch(MethodId(frame.id()))
|
||||||
|
|
@ -270,8 +267,9 @@ void ConnectionPrivate::_q_method( const QAMQP::Frame::Method & frame )
|
||||||
break;
|
break;
|
||||||
default:
|
default:
|
||||||
qWarning("Unknown method-id %d", frame.id());
|
qWarning("Unknown method-id %d", frame.id());
|
||||||
|
return false;
|
||||||
}
|
}
|
||||||
|
return true;
|
||||||
}
|
}
|
||||||
|
|
||||||
//////////////////////////////////////////////////////////////////////////
|
//////////////////////////////////////////////////////////////////////////
|
||||||
|
|
|
||||||
|
|
@ -38,7 +38,7 @@ namespace QAMQP
|
||||||
void openOk(const QAMQP::Frame::Method & frame);
|
void openOk(const QAMQP::Frame::Method & frame);
|
||||||
void close(const QAMQP::Frame::Method & frame);
|
void close(const QAMQP::Frame::Method & frame);
|
||||||
void closeOk(const QAMQP::Frame::Method & frame);
|
void closeOk(const QAMQP::Frame::Method & frame);
|
||||||
void _q_method(const QAMQP::Frame::Method & frame);
|
bool _q_method(const QAMQP::Frame::Method & frame);
|
||||||
|
|
||||||
void setQOS(qint32 prefetchSize, quint16 prefetchCount, int channel, bool global);
|
void setQOS(qint32 prefetchSize, quint16 prefetchCount, int channel, bool global);
|
||||||
|
|
||||||
|
|
|
||||||
|
|
@ -128,12 +128,13 @@ ExchangePrivate::~ExchangePrivate()
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
void ExchangePrivate::_q_method( const QAMQP::Frame::Method & frame )
|
bool ExchangePrivate::_q_method( const QAMQP::Frame::Method & frame )
|
||||||
{
|
{
|
||||||
ChannelPrivate::_q_method(frame);
|
if(ChannelPrivate::_q_method(frame))
|
||||||
if(frame.methodClass() != QAMQP::Frame::fcExchange
|
return true;
|
||||||
|| frame.channel() != number )
|
|
||||||
return;
|
if(frame.methodClass() != QAMQP::Frame::fcExchange)
|
||||||
|
return false;
|
||||||
|
|
||||||
switch(frame.id())
|
switch(frame.id())
|
||||||
{
|
{
|
||||||
|
|
@ -146,6 +147,7 @@ void ExchangePrivate::_q_method( const QAMQP::Frame::Method & frame )
|
||||||
default:
|
default:
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
|
return true;
|
||||||
}
|
}
|
||||||
|
|
||||||
void ExchangePrivate::declareOk( const QAMQP::Frame::Method & frame )
|
void ExchangePrivate::declareOk( const QAMQP::Frame::Method & frame )
|
||||||
|
|
|
||||||
|
|
@ -30,7 +30,7 @@ namespace QAMQP
|
||||||
Exchange::ExchangeOptions options;
|
Exchange::ExchangeOptions options;
|
||||||
TableField arguments;
|
TableField arguments;
|
||||||
|
|
||||||
void _q_method(const QAMQP::Frame::Method & frame);
|
bool _q_method(const QAMQP::Frame::Method & frame);
|
||||||
|
|
||||||
bool deleyedDeclare;
|
bool deleyedDeclare;
|
||||||
bool declared;
|
bool declared;
|
||||||
|
|
|
||||||
|
|
@ -9,8 +9,13 @@ namespace QAMQP
|
||||||
{
|
{
|
||||||
Message()
|
Message()
|
||||||
{
|
{
|
||||||
|
qDebug("Message create");
|
||||||
leftSize = 0;
|
leftSize = 0;
|
||||||
}
|
}
|
||||||
|
~Message()
|
||||||
|
{
|
||||||
|
qDebug("Message release");
|
||||||
|
}
|
||||||
typedef QAMQP::Frame::Content::Property MessageProperty;
|
typedef QAMQP::Frame::Content::Property MessageProperty;
|
||||||
Q_DECLARE_FLAGS(MessageProperties, MessageProperty);
|
Q_DECLARE_FLAGS(MessageProperties, MessageProperty);
|
||||||
|
|
||||||
|
|
|
||||||
|
|
@ -2,9 +2,11 @@
|
||||||
#define qamqp_amqp_p_h__
|
#define qamqp_amqp_p_h__
|
||||||
#include <QtCore/private/qobject_p.h>
|
#include <QtCore/private/qobject_p.h>
|
||||||
|
|
||||||
|
#include <QSharedPointer>
|
||||||
|
|
||||||
#include "amqp_network.h"
|
#include "amqp_network.h"
|
||||||
#include "amqp_connection.h"
|
#include "amqp_connection.h"
|
||||||
|
#include "amqp_authenticator.h"
|
||||||
|
|
||||||
namespace QAMQP
|
namespace QAMQP
|
||||||
{
|
{
|
||||||
|
|
@ -23,17 +25,20 @@ namespace QAMQP
|
||||||
void parseCnnString( const QUrl & connectionString);
|
void parseCnnString( const QUrl & connectionString);
|
||||||
void sockConnect();
|
void sockConnect();
|
||||||
void login();
|
void login();
|
||||||
|
void setAuth(Authenticator* auth);
|
||||||
Exchange * createExchange(int channelNumber, const QString &name);
|
Exchange * createExchange(int channelNumber, const QString &name);
|
||||||
Queue * createQueue(int channelNumber, const QString &name);
|
Queue * createQueue(int channelNumber, const QString &name);
|
||||||
|
|
||||||
quint32 port;
|
quint32 port;
|
||||||
QString host;
|
QString host;
|
||||||
QString virtualHost;
|
QString virtualHost;
|
||||||
|
/*
|
||||||
QString user;
|
QString user;
|
||||||
QString password;
|
QString password;*/
|
||||||
|
|
||||||
QPointer<QAMQP::Network> network_;
|
QPointer<QAMQP::Network> network_;
|
||||||
QPointer<QAMQP::Connection> connection_;
|
QPointer<QAMQP::Connection> connection_;
|
||||||
|
QSharedPointer<Authenticator> auth_;
|
||||||
};
|
};
|
||||||
}
|
}
|
||||||
#endif // amqp_p_h__
|
#endif // amqp_p_h__
|
||||||
|
|
|
||||||
|
|
@ -171,11 +171,10 @@ QueuePrivate::~QueuePrivate()
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
void QueuePrivate::_q_method( const QAMQP::Frame::Method & frame )
|
bool QueuePrivate::_q_method( const QAMQP::Frame::Method & frame )
|
||||||
{
|
{
|
||||||
ChannelPrivate::_q_method(frame);
|
if(ChannelPrivate::_q_method(frame))
|
||||||
if(frame.channel() != number)
|
return true;
|
||||||
return;
|
|
||||||
|
|
||||||
if(frame.methodClass() == QAMQP::Frame::fcQueue)
|
if(frame.methodClass() == QAMQP::Frame::fcQueue)
|
||||||
{
|
{
|
||||||
|
|
@ -199,6 +198,7 @@ void QueuePrivate::_q_method( const QAMQP::Frame::Method & frame )
|
||||||
default:
|
default:
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
|
return true;
|
||||||
}
|
}
|
||||||
|
|
||||||
if(frame.methodClass() == QAMQP::Frame::fcBasic)
|
if(frame.methodClass() == QAMQP::Frame::fcBasic)
|
||||||
|
|
@ -214,13 +214,17 @@ void QueuePrivate::_q_method( const QAMQP::Frame::Method & frame )
|
||||||
default:
|
default:
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
|
return true;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
return false;
|
||||||
}
|
}
|
||||||
|
|
||||||
void QueuePrivate::declareOk( const QAMQP::Frame::Method & frame )
|
void QueuePrivate::declareOk( const QAMQP::Frame::Method & frame )
|
||||||
{
|
{
|
||||||
|
|
||||||
qDebug() << "Declared queue: " << name;
|
qDebug() << "Declared queue: " << name;
|
||||||
declared = true;
|
declared = true;
|
||||||
|
|
||||||
|
|
@ -237,7 +241,7 @@ void QueuePrivate::declareOk( const QAMQP::Frame::Method & frame )
|
||||||
|
|
||||||
void QueuePrivate::deleteOk( const QAMQP::Frame::Method & frame )
|
void QueuePrivate::deleteOk( const QAMQP::Frame::Method & frame )
|
||||||
{
|
{
|
||||||
qDebug() << "Deleted or purged queue: " << name;
|
qDebug() << "Deleted or purged queue: " << name;
|
||||||
declared = false;
|
declared = false;
|
||||||
|
|
||||||
QByteArray data = frame.arguments();
|
QByteArray data = frame.arguments();
|
||||||
|
|
@ -257,6 +261,7 @@ void QueuePrivate::bindOk( const QAMQP::Frame::Method & frame )
|
||||||
|
|
||||||
void QueuePrivate::unbindOk( const QAMQP::Frame::Method & frame )
|
void QueuePrivate::unbindOk( const QAMQP::Frame::Method & frame )
|
||||||
{
|
{
|
||||||
|
|
||||||
qDebug() << "Unbinded queue: " << name;
|
qDebug() << "Unbinded queue: " << name;
|
||||||
QMetaObject::invokeMethod(q_func(), "binded", Q_ARG(bool, false));
|
QMetaObject::invokeMethod(q_func(), "binded", Q_ARG(bool, false));
|
||||||
}
|
}
|
||||||
|
|
@ -282,6 +287,7 @@ void QueuePrivate::declare()
|
||||||
sendFrame(frame);
|
sendFrame(frame);
|
||||||
deleyedDeclare = false;
|
deleyedDeclare = false;
|
||||||
|
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
void QueuePrivate::remove( bool ifUnused /*= true*/, bool ifEmpty /*= true*/, bool noWait /*= true*/ )
|
void QueuePrivate::remove( bool ifUnused /*= true*/, bool ifEmpty /*= true*/, bool noWait /*= true*/ )
|
||||||
|
|
@ -307,6 +313,7 @@ void QueuePrivate::remove( bool ifUnused /*= true*/, bool ifEmpty /*= true*/, bo
|
||||||
|
|
||||||
frame.setArguments(arguments_);
|
frame.setArguments(arguments_);
|
||||||
sendFrame(frame);
|
sendFrame(frame);
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
void QueuePrivate::purge()
|
void QueuePrivate::purge()
|
||||||
|
|
@ -349,6 +356,7 @@ void QueuePrivate::bind( const QString & exchangeName, const QString & key )
|
||||||
|
|
||||||
frame.setArguments(arguments_);
|
frame.setArguments(arguments_);
|
||||||
sendFrame(frame);
|
sendFrame(frame);
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
void QueuePrivate::unbind( const QString & exchangeName, const QString & key )
|
void QueuePrivate::unbind( const QString & exchangeName, const QString & key )
|
||||||
|
|
@ -394,11 +402,13 @@ void QueuePrivate::consume( Queue::ConsumeOptions options )
|
||||||
|
|
||||||
frame.setArguments(arguments_);
|
frame.setArguments(arguments_);
|
||||||
sendFrame(frame);
|
sendFrame(frame);
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
void QueuePrivate::consumeOk( const QAMQP::Frame::Method & frame )
|
void QueuePrivate::consumeOk( const QAMQP::Frame::Method & frame )
|
||||||
{
|
{
|
||||||
|
|
||||||
qDebug() << "Consume ok: " << name;
|
qDebug() << "Consume ok: " << name;
|
||||||
declared = false;
|
declared = false;
|
||||||
|
|
||||||
|
|
@ -411,7 +421,6 @@ void QueuePrivate::consumeOk( const QAMQP::Frame::Method & frame )
|
||||||
|
|
||||||
void QueuePrivate::deliver( const QAMQP::Frame::Method & frame )
|
void QueuePrivate::deliver( const QAMQP::Frame::Method & frame )
|
||||||
{
|
{
|
||||||
qDebug() << "* Receive message: ";
|
|
||||||
declared = false;
|
declared = false;
|
||||||
|
|
||||||
QByteArray data = frame.arguments();
|
QByteArray data = frame.arguments();
|
||||||
|
|
@ -427,12 +436,6 @@ void QueuePrivate::deliver( const QAMQP::Frame::Method & frame )
|
||||||
QString exchangeName = readField('s',in).toString();
|
QString exchangeName = readField('s',in).toString();
|
||||||
QString routingKey = readField('s',in).toString();
|
QString routingKey = readField('s',in).toString();
|
||||||
|
|
||||||
qDebug() << "| Delivery-tag: " << deliveryTag;
|
|
||||||
qDebug() << "| Redelivered: " << redelivered;
|
|
||||||
qDebug("| Exchange-name: %s", qPrintable(exchangeName));
|
|
||||||
qDebug("| Routing-key: %s", qPrintable(routingKey));
|
|
||||||
|
|
||||||
|
|
||||||
MessagePtr newMessage = MessagePtr(new Message);
|
MessagePtr newMessage = MessagePtr(new Message);
|
||||||
newMessage->routeKey = routingKey;
|
newMessage->routeKey = routingKey;
|
||||||
newMessage->exchangeName = exchangeName;
|
newMessage->exchangeName = exchangeName;
|
||||||
|
|
@ -444,9 +447,6 @@ void QueuePrivate::_q_content( const QAMQP::Frame::Content & frame )
|
||||||
{
|
{
|
||||||
if(frame.channel() != number)
|
if(frame.channel() != number)
|
||||||
return;
|
return;
|
||||||
QFile::remove("dump.jpg");
|
|
||||||
qDebug() << "Content-type: " << qPrintable(frame.property(Content::cpContentType).toString());
|
|
||||||
qDebug() << "Encoding-type: " << qPrintable(frame.property(Content::cpContentEncoding).toString());
|
|
||||||
if(messages_.isEmpty())
|
if(messages_.isEmpty())
|
||||||
{
|
{
|
||||||
qErrnoWarning("Received content-header without method frame before");
|
qErrnoWarning("Received content-header without method frame before");
|
||||||
|
|
|
||||||
|
|
@ -48,7 +48,7 @@ namespace QAMQP
|
||||||
QString type;
|
QString type;
|
||||||
Queue::QueueOptions options;
|
Queue::QueueOptions options;
|
||||||
|
|
||||||
void _q_method(const QAMQP::Frame::Method & frame);
|
bool _q_method(const QAMQP::Frame::Method & frame);
|
||||||
|
|
||||||
bool deleyedDeclare;
|
bool deleyedDeclare;
|
||||||
bool declared;
|
bool declared;
|
||||||
|
|
|
||||||
42
src/test.cpp
42
src/test.cpp
|
|
@ -1,19 +1,28 @@
|
||||||
#include "test.h"
|
#include "test.h"
|
||||||
|
#include <QTime>
|
||||||
|
#include <QFile>
|
||||||
|
|
||||||
Test::Test()
|
Test::Test()
|
||||||
{
|
{
|
||||||
QUrl con(QString("amqp://guest:guest@localhost:5672/"));
|
QUrl con(QString("amqp://guest:guest@localhost:5672/"));
|
||||||
client_ = new QAMQP::Client(this);
|
client_ = new QAMQP::Client(this);
|
||||||
client_->open(con);
|
client_->open(con);
|
||||||
exchange_ = client_->createExchange("test.test");
|
exchange_ = client_->createExchange("test.test2");
|
||||||
exchange_->declare("direct");
|
exchange_->declare("fanout");
|
||||||
|
|
||||||
queue_ = client_->createQueue("test.my_queue", exchange_->channelNumber());
|
queue_ = client_->createQueue("test.my_queue", exchange_->channelNumber());
|
||||||
queue_->declare();
|
queue_->declare();
|
||||||
|
|
||||||
|
queue2_ = client_->createQueue("test.my_queue2");
|
||||||
|
queue2_->declare();
|
||||||
|
|
||||||
exchange_->bind(queue_);
|
exchange_->bind(queue_);
|
||||||
|
exchange_->bind(queue2_);
|
||||||
|
|
||||||
connect(queue_, SIGNAL(declared()), this, SLOT(declared()));
|
connect(queue_, SIGNAL(declared()), this, SLOT(declared()));
|
||||||
|
connect(queue_, SIGNAL(messageRecieved()), this, SLOT(newMessage()));
|
||||||
|
|
||||||
|
connect(queue2_, SIGNAL(messageRecieved()), this, SLOT(newMessage()));
|
||||||
}
|
}
|
||||||
|
|
||||||
Test::~Test()
|
Test::~Test()
|
||||||
|
|
@ -24,5 +33,32 @@ Test::~Test()
|
||||||
void Test::declared()
|
void Test::declared()
|
||||||
{
|
{
|
||||||
qDebug("\t-= Ready =-");
|
qDebug("\t-= Ready =-");
|
||||||
exchange_->publish("test 3432 432 24 23 423 32 23 4324 32 423 423 423", exchange_->name());
|
//queue_->purge();
|
||||||
|
QFile f("D:/geoip.eap");
|
||||||
|
f.open(QIODevice::ReadOnly);
|
||||||
|
exchange_->publish(f.readAll(), exchange_->name(), "image/jpg");
|
||||||
|
//queue_->remove(true, false, false);
|
||||||
|
queue_->setQOS(0,10);
|
||||||
|
queue_->setConsumerTag("qamqp-consumer");
|
||||||
|
queue_->consume(QAMQP::Queue::coNoAck);
|
||||||
|
|
||||||
|
queue2_->setQOS(0,10);
|
||||||
|
queue2_->setConsumerTag("qamqp-consumer2");
|
||||||
|
queue2_->consume(QAMQP::Queue::coNoAck);
|
||||||
|
//exchange_->remove(false, false);
|
||||||
|
}
|
||||||
|
|
||||||
|
void Test::newMessage()
|
||||||
|
{
|
||||||
|
QAMQP::Queue * q = qobject_cast<QAMQP::Queue *>(sender());
|
||||||
|
while (q->hasMessage())
|
||||||
|
{
|
||||||
|
QAMQP::MessagePtr message = q->getMessage();
|
||||||
|
qDebug("+ RECEIVE MESSAGE");
|
||||||
|
qDebug("| Exchange-name: %s", qPrintable(message->exchangeName));
|
||||||
|
qDebug("| Routing-key: %s", qPrintable(message->routeKey));
|
||||||
|
qDebug("| Content-type: %s", qPrintable(message->property[QAMQP::Frame::Content::cpContentType].toString()));
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
@ -15,8 +15,10 @@ public:
|
||||||
private slots:
|
private slots:
|
||||||
|
|
||||||
void declared();
|
void declared();
|
||||||
|
void newMessage();
|
||||||
|
|
||||||
private:
|
private:
|
||||||
QPointer<QAMQP::Client> client_;
|
QPointer<QAMQP::Client> client_;
|
||||||
QPointer<QAMQP::Exchange> exchange_;
|
QPointer<QAMQP::Exchange> exchange_;
|
||||||
QPointer<QAMQP::Queue> queue_;
|
QPointer<QAMQP::Queue> queue_, queue2_;
|
||||||
};
|
};
|
||||||
Loading…
Reference in New Issue