From 267c9801f000a859a714f857c22a44d354c7a73a Mon Sep 17 00:00:00 2001 From: fuCtor Date: Thu, 23 Feb 2012 05:54:04 -0800 Subject: [PATCH] [+] Authenticator - authentication mechanism abstraction [+] AMQPLAIN authentication Authenticator implement --- QAMQP.vcproj | 8 ++++ src/qamqp/amqp.cpp | 66 ++++++++++++++++++++++++++++---- src/qamqp/amqp.h | 5 ++- src/qamqp/amqp_authenticator.cpp | 47 +++++++++++++++++++++++ src/qamqp/amqp_authenticator.h | 33 ++++++++++++++++ src/qamqp/amqp_channel.cpp | 11 ++++-- src/qamqp/amqp_channel_p.h | 4 +- src/qamqp/amqp_connection.cpp | 18 ++++----- src/qamqp/amqp_connection_p.h | 2 +- src/qamqp/amqp_exchange.cpp | 12 +++--- src/qamqp/amqp_exchange_p.h | 2 +- src/qamqp/amqp_message.h | 5 +++ src/qamqp/amqp_p.h | 9 ++++- src/qamqp/amqp_queue.cpp | 30 +++++++-------- src/qamqp/amqp_queue_p.h | 2 +- src/test.cpp | 44 +++++++++++++++++++-- src/test.h | 4 +- 17 files changed, 247 insertions(+), 55 deletions(-) create mode 100644 src/qamqp/amqp_authenticator.cpp create mode 100644 src/qamqp/amqp_authenticator.h diff --git a/QAMQP.vcproj b/QAMQP.vcproj index 22a7fb7..efe4726 100644 --- a/QAMQP.vcproj +++ b/QAMQP.vcproj @@ -166,6 +166,10 @@ RelativePath=".\src\qamqp\amqp.cpp" > + + @@ -230,6 +234,10 @@ /> + + diff --git a/src/qamqp/amqp.cpp b/src/qamqp/amqp.cpp index 4e6c1a1..c5dc36e 100644 --- a/src/qamqp/amqp.cpp +++ b/src/qamqp/amqp.cpp @@ -6,6 +6,7 @@ #include "qamqp_global.h" #include "amqp_exchange.h" #include "amqp_queue.h" +#include "amqp_authenticator.h" using namespace QAMQP; @@ -31,8 +32,6 @@ ClientPrivate::ClientPrivate(int version ) , port(AMQPPORT) , host(QString::fromLatin1(AMQPHOST)) , virtualHost(QString::fromLatin1(AMQPVHOST)) - , user(QString::fromLatin1(AMQPLOGIN)) - , password(QString::fromLatin1(AMQPPSWD)) { } @@ -55,6 +54,8 @@ void ClientPrivate::init(QObject * parent) connection_ = new QAMQP::Connection(q_func()); } + setAuth(new AMQPlainAuthenticator(QString::fromLatin1(AMQPLOGIN), QString::fromLatin1(AMQPPSWD))); + QObject::connect(network_, SIGNAL(method(const QAMQP::Frame::Method &)), connection_, SLOT(_q_method(const QAMQP::Frame::Method &))); } @@ -66,14 +67,27 @@ void ClientPrivate::init(QObject * parent, const QUrl & con) ClientPrivate::connect(); } + +void ClientPrivate::setAuth( Authenticator* auth ) +{ + auth_ = QSharedPointer(auth); +} + + void ClientPrivate::printConnect() const { QTextStream stream(stdout); stream << "port = " << port << endl; stream << "host = " << host << endl; stream << "vhost = " << virtualHost << endl; - stream << "user = " << user << endl; - stream << "passw = " << password << endl; + + if(auth_ && auth_->type() == "AMQPLAIN") + { + QSharedPointer a = auth_.staticCast(); + stream << "user = " << a->login() << endl; + stream << "passw = " << a->password() << endl; + } + } void ClientPrivate::connect() @@ -221,22 +235,48 @@ void QAMQP::Client::setVirtualHost( const QString & virtualHost ) QString QAMQP::Client::user() const { - return d_func()->user; + const Authenticator * auth = d_func()->auth_.data(); + + if(auth && auth->type() == "AMQPLAIN") + { + const AMQPlainAuthenticator * a = static_cast(auth); + return a->login(); + } + return QString(); } void QAMQP::Client::setUser( const QString & user ) { - d_func()->user = user; + Authenticator * auth = d_func()->auth_.data(); + + if(auth && auth->type() == "AMQPLAIN") + { + AMQPlainAuthenticator * a = static_cast(auth); + a->setLogin(user); + } } QString QAMQP::Client::password() const { - return d_func()->password; + const Authenticator * auth = d_func()->auth_.data(); + + if(auth && auth->type() == "AMQPLAIN") + { + const AMQPlainAuthenticator * a = static_cast(auth); + return a->password(); + } + return QString(); } void QAMQP::Client::setPassword( const QString & password ) { - d_func()->password = password; + Authenticator * auth = d_func()->auth_.data(); + + if(auth && auth->type() == "AMQPLAIN") + { + AMQPlainAuthenticator * a = static_cast(auth); + a->setPassword(password); + } } void QAMQP::Client::printConnect() const @@ -291,4 +331,14 @@ void QAMQP::Client::reopen() { return d_func()->connect(); return d_func()->disconnect(); +} + +void QAMQP::Client::setAuth( Authenticator * auth ) +{ + d_func()->setAuth(auth); +} + +Authenticator * QAMQP::Client::auth() const +{ + return d_func()->auth_.data(); } \ No newline at end of file diff --git a/src/qamqp/amqp.h b/src/qamqp/amqp.h index a2e9d37..2ccf50b 100644 --- a/src/qamqp/amqp.h +++ b/src/qamqp/amqp.h @@ -9,6 +9,7 @@ namespace QAMQP class Exchange; class Queue; class ClientPrivate; + class Authenticator; class ConnectionPrivate; class Client : public QObject { @@ -53,7 +54,9 @@ namespace QAMQP QString password() const; void setPassword(const QString & password); - + + void setAuth(Authenticator * auth); + Authenticator * auth() const; void open(); void open(const QUrl & connectionString); void close(); diff --git a/src/qamqp/amqp_authenticator.cpp b/src/qamqp/amqp_authenticator.cpp new file mode 100644 index 0000000..4cc6b52 --- /dev/null +++ b/src/qamqp/amqp_authenticator.cpp @@ -0,0 +1,47 @@ +#include "amqp_authenticator.h" +#include "amqp_frame.h" + +QString QAMQP::AMQPlainAuthenticator::login() const +{ + return login_; +} + +QString QAMQP::AMQPlainAuthenticator::password() const +{ + return password_; +} + +QAMQP::AMQPlainAuthenticator::AMQPlainAuthenticator( const QString & l /*= QString()*/, const QString & p /*= QString()*/ ) +{ + login_ = l; + password_ = p; +} + +QAMQP::AMQPlainAuthenticator::~AMQPlainAuthenticator() +{ + +} + +QString QAMQP::AMQPlainAuthenticator::type() const +{ + return "AMQPLAIN"; +} + +void QAMQP::AMQPlainAuthenticator::setLogin( const QString& l ) +{ + login_ = l; +} + +void QAMQP::AMQPlainAuthenticator::setPassword( const QString &p ) +{ + password_ = p; +} + +void QAMQP::AMQPlainAuthenticator::write( QDataStream & out ) +{ + QAMQP::Frame::writeField('s', out, type()); + QAMQP::Frame::TableField response; + response["LOGIN"] = login_; + response["PASSWORD"] = password_; + QAMQP::Frame::serialize(out, response); +} \ No newline at end of file diff --git a/src/qamqp/amqp_authenticator.h b/src/qamqp/amqp_authenticator.h new file mode 100644 index 0000000..4d46f9d --- /dev/null +++ b/src/qamqp/amqp_authenticator.h @@ -0,0 +1,33 @@ +#ifndef amqp_authenticator_h__ +#define amqp_authenticator_h__ + +#include "qamqp_global.h" +#include +#include + +namespace QAMQP +{ + class Authenticator + { + public: + virtual ~Authenticator(){}; + virtual QString type() const = 0; + virtual void write(QDataStream & out) = 0; + }; + + class AMQPlainAuthenticator : public Authenticator + { + QString login_, password_; + public: + AMQPlainAuthenticator(const QString & login = QString(), const QString & password = QString()); + virtual ~AMQPlainAuthenticator(); + QString login() const; + void setLogin(const QString& l); + QString password() const; + void setPassword(const QString &p); + virtual QString type() const; + virtual void write(QDataStream & out); + }; + +} +#endif // amqp_authenticator_h__ \ No newline at end of file diff --git a/src/qamqp/amqp_channel.cpp b/src/qamqp/amqp_channel.cpp index 175794b..9dee78e 100644 --- a/src/qamqp/amqp_channel.cpp +++ b/src/qamqp/amqp_channel.cpp @@ -159,11 +159,13 @@ void ChannelPrivate::init(int channelNumber, Client * parent) } -void ChannelPrivate::_q_method( const QAMQP::Frame::Method & frame ) +bool ChannelPrivate::_q_method( const QAMQP::Frame::Method & frame ) { - if(frame.methodClass() != QAMQP::Frame::fcChannel - || frame.channel() != number ) - return; + if(frame.channel() != number ) + return true; + + if(frame.methodClass() != QAMQP::Frame::fcChannel) + return false; qDebug("Channel#%d:", number); @@ -185,6 +187,7 @@ void ChannelPrivate::_q_method( const QAMQP::Frame::Method & frame ) closeOk(frame); break; } + return true; } void ChannelPrivate::_q_open() diff --git a/src/qamqp/amqp_channel_p.h b/src/qamqp/amqp_channel_p.h index 95317e2..c1f899e 100644 --- a/src/qamqp/amqp_channel_p.h +++ b/src/qamqp/amqp_channel_p.h @@ -44,7 +44,7 @@ namespace QAMQP }; ChannelPrivate(int version = QObjectPrivateVersion); - ~ChannelPrivate(); + virtual ~ChannelPrivate(); void init(int channelNumber, Client * parent); @@ -62,7 +62,7 @@ namespace QAMQP void close(const QAMQP::Frame::Method & frame); void closeOk(const QAMQP::Frame::Method & frame); - virtual void _q_method(const QAMQP::Frame::Method & frame); + virtual bool _q_method(const QAMQP::Frame::Method & frame); void _q_open(); void setQOS(qint32 prefetchSize, quint16 prefetchCount); diff --git a/src/qamqp/amqp_connection.cpp b/src/qamqp/amqp_connection.cpp index 69ece86..141a8ae 100644 --- a/src/qamqp/amqp_connection.cpp +++ b/src/qamqp/amqp_connection.cpp @@ -57,16 +57,13 @@ void ConnectionPrivate::startOk() QDataStream stream(&arguments_, QIODevice::WriteOnly); QAMQP::Frame::TableField clientProperties; - clientProperties["version"] = "0.0.1"; + clientProperties["version"] = "0.0.3"; clientProperties["platform"] = QString("Qt %1").arg(qVersion()); clientProperties["product"] = "QAMQP"; QAMQP::Frame::serialize(stream, clientProperties); - QAMQP::Frame::writeField('s', stream, "AMQPLAIN"); - QAMQP::Frame::TableField response; - response["LOGIN"] = client_->user(); - response["PASSWORD"] = client_->password(); - QAMQP::Frame::serialize(stream, response); + client_->d_func()->auth_->write(stream); + QAMQP::Frame::writeField('s', stream, "en_US"); frame.setArguments(arguments_); @@ -234,10 +231,10 @@ void ConnectionPrivate::setQOS( qint32 prefetchSize, quint16 prefetchCount, int } -void ConnectionPrivate::_q_method( const QAMQP::Frame::Method & frame ) +bool ConnectionPrivate::_q_method( const QAMQP::Frame::Method & frame ) { if(frame.methodClass() != QAMQP::Frame::fcConnection) - return; + return true; qDebug() << "Connection:"; @@ -245,7 +242,7 @@ void ConnectionPrivate::_q_method( const QAMQP::Frame::Method & frame ) { if( frame.id() == miCloseOk) closeOk(frame); - return; + return true; } switch(MethodId(frame.id())) @@ -270,8 +267,9 @@ void ConnectionPrivate::_q_method( const QAMQP::Frame::Method & frame ) break; default: qWarning("Unknown method-id %d", frame.id()); + return false; } - + return true; } ////////////////////////////////////////////////////////////////////////// diff --git a/src/qamqp/amqp_connection_p.h b/src/qamqp/amqp_connection_p.h index d93c219..3ceecb1 100644 --- a/src/qamqp/amqp_connection_p.h +++ b/src/qamqp/amqp_connection_p.h @@ -38,7 +38,7 @@ namespace QAMQP void openOk(const QAMQP::Frame::Method & frame); void close(const QAMQP::Frame::Method & frame); void closeOk(const QAMQP::Frame::Method & frame); - void _q_method(const QAMQP::Frame::Method & frame); + bool _q_method(const QAMQP::Frame::Method & frame); void setQOS(qint32 prefetchSize, quint16 prefetchCount, int channel, bool global); diff --git a/src/qamqp/amqp_exchange.cpp b/src/qamqp/amqp_exchange.cpp index e0e713e..ac36121 100644 --- a/src/qamqp/amqp_exchange.cpp +++ b/src/qamqp/amqp_exchange.cpp @@ -128,12 +128,13 @@ ExchangePrivate::~ExchangePrivate() } -void ExchangePrivate::_q_method( const QAMQP::Frame::Method & frame ) +bool ExchangePrivate::_q_method( const QAMQP::Frame::Method & frame ) { - ChannelPrivate::_q_method(frame); - if(frame.methodClass() != QAMQP::Frame::fcExchange - || frame.channel() != number ) - return; + if(ChannelPrivate::_q_method(frame)) + return true; + + if(frame.methodClass() != QAMQP::Frame::fcExchange) + return false; switch(frame.id()) { @@ -146,6 +147,7 @@ void ExchangePrivate::_q_method( const QAMQP::Frame::Method & frame ) default: break; } + return true; } void ExchangePrivate::declareOk( const QAMQP::Frame::Method & frame ) diff --git a/src/qamqp/amqp_exchange_p.h b/src/qamqp/amqp_exchange_p.h index f91f429..7a28a43 100644 --- a/src/qamqp/amqp_exchange_p.h +++ b/src/qamqp/amqp_exchange_p.h @@ -30,7 +30,7 @@ namespace QAMQP Exchange::ExchangeOptions options; TableField arguments; - void _q_method(const QAMQP::Frame::Method & frame); + bool _q_method(const QAMQP::Frame::Method & frame); bool deleyedDeclare; bool declared; diff --git a/src/qamqp/amqp_message.h b/src/qamqp/amqp_message.h index 640078a..5266723 100644 --- a/src/qamqp/amqp_message.h +++ b/src/qamqp/amqp_message.h @@ -9,8 +9,13 @@ namespace QAMQP { Message() { + qDebug("Message create"); leftSize = 0; } + ~Message() + { + qDebug("Message release"); + } typedef QAMQP::Frame::Content::Property MessageProperty; Q_DECLARE_FLAGS(MessageProperties, MessageProperty); diff --git a/src/qamqp/amqp_p.h b/src/qamqp/amqp_p.h index b41b3c6..5edd73b 100644 --- a/src/qamqp/amqp_p.h +++ b/src/qamqp/amqp_p.h @@ -2,9 +2,11 @@ #define qamqp_amqp_p_h__ #include +#include #include "amqp_network.h" #include "amqp_connection.h" +#include "amqp_authenticator.h" namespace QAMQP { @@ -23,17 +25,20 @@ namespace QAMQP void parseCnnString( const QUrl & connectionString); void sockConnect(); void login(); - + void setAuth(Authenticator* auth); Exchange * createExchange(int channelNumber, const QString &name); Queue * createQueue(int channelNumber, const QString &name); quint32 port; QString host; QString virtualHost; +/* QString user; - QString password; + QString password;*/ + QPointer network_; QPointer connection_; + QSharedPointer auth_; }; } #endif // amqp_p_h__ diff --git a/src/qamqp/amqp_queue.cpp b/src/qamqp/amqp_queue.cpp index ef0aaf9..be1f6b3 100644 --- a/src/qamqp/amqp_queue.cpp +++ b/src/qamqp/amqp_queue.cpp @@ -171,11 +171,10 @@ QueuePrivate::~QueuePrivate() } -void QueuePrivate::_q_method( const QAMQP::Frame::Method & frame ) +bool QueuePrivate::_q_method( const QAMQP::Frame::Method & frame ) { - ChannelPrivate::_q_method(frame); - if(frame.channel() != number) - return; + if(ChannelPrivate::_q_method(frame)) + return true; if(frame.methodClass() == QAMQP::Frame::fcQueue) { @@ -199,6 +198,7 @@ void QueuePrivate::_q_method( const QAMQP::Frame::Method & frame ) default: break; } + return true; } if(frame.methodClass() == QAMQP::Frame::fcBasic) @@ -214,13 +214,17 @@ void QueuePrivate::_q_method( const QAMQP::Frame::Method & frame ) default: break; } + return true; } + + return false; } void QueuePrivate::declareOk( const QAMQP::Frame::Method & frame ) { + qDebug() << "Declared queue: " << name; declared = true; @@ -237,7 +241,7 @@ void QueuePrivate::declareOk( const QAMQP::Frame::Method & frame ) void QueuePrivate::deleteOk( const QAMQP::Frame::Method & frame ) { - qDebug() << "Deleted or purged queue: " << name; + qDebug() << "Deleted or purged queue: " << name; declared = false; QByteArray data = frame.arguments(); @@ -257,6 +261,7 @@ void QueuePrivate::bindOk( const QAMQP::Frame::Method & frame ) void QueuePrivate::unbindOk( const QAMQP::Frame::Method & frame ) { + qDebug() << "Unbinded queue: " << name; QMetaObject::invokeMethod(q_func(), "binded", Q_ARG(bool, false)); } @@ -281,6 +286,7 @@ void QueuePrivate::declare() frame.setArguments(arguments_); sendFrame(frame); deleyedDeclare = false; + } @@ -307,6 +313,7 @@ void QueuePrivate::remove( bool ifUnused /*= true*/, bool ifEmpty /*= true*/, bo frame.setArguments(arguments_); sendFrame(frame); + } void QueuePrivate::purge() @@ -349,6 +356,7 @@ void QueuePrivate::bind( const QString & exchangeName, const QString & key ) frame.setArguments(arguments_); sendFrame(frame); + } void QueuePrivate::unbind( const QString & exchangeName, const QString & key ) @@ -394,11 +402,13 @@ void QueuePrivate::consume( Queue::ConsumeOptions options ) frame.setArguments(arguments_); sendFrame(frame); + } void QueuePrivate::consumeOk( const QAMQP::Frame::Method & frame ) { + qDebug() << "Consume ok: " << name; declared = false; @@ -411,7 +421,6 @@ void QueuePrivate::consumeOk( const QAMQP::Frame::Method & frame ) void QueuePrivate::deliver( const QAMQP::Frame::Method & frame ) { - qDebug() << "* Receive message: "; declared = false; QByteArray data = frame.arguments(); @@ -427,12 +436,6 @@ void QueuePrivate::deliver( const QAMQP::Frame::Method & frame ) QString exchangeName = readField('s',in).toString(); QString routingKey = readField('s',in).toString(); - qDebug() << "| Delivery-tag: " << deliveryTag; - qDebug() << "| Redelivered: " << redelivered; - qDebug("| Exchange-name: %s", qPrintable(exchangeName)); - qDebug("| Routing-key: %s", qPrintable(routingKey)); - - MessagePtr newMessage = MessagePtr(new Message); newMessage->routeKey = routingKey; newMessage->exchangeName = exchangeName; @@ -444,9 +447,6 @@ void QueuePrivate::_q_content( const QAMQP::Frame::Content & frame ) { if(frame.channel() != number) return; - QFile::remove("dump.jpg"); - qDebug() << "Content-type: " << qPrintable(frame.property(Content::cpContentType).toString()); - qDebug() << "Encoding-type: " << qPrintable(frame.property(Content::cpContentEncoding).toString()); if(messages_.isEmpty()) { qErrnoWarning("Received content-header without method frame before"); diff --git a/src/qamqp/amqp_queue_p.h b/src/qamqp/amqp_queue_p.h index 6648e4b..d1f1a84 100644 --- a/src/qamqp/amqp_queue_p.h +++ b/src/qamqp/amqp_queue_p.h @@ -48,7 +48,7 @@ namespace QAMQP QString type; Queue::QueueOptions options; - void _q_method(const QAMQP::Frame::Method & frame); + bool _q_method(const QAMQP::Frame::Method & frame); bool deleyedDeclare; bool declared; diff --git a/src/test.cpp b/src/test.cpp index 36936d5..1c83a8e 100644 --- a/src/test.cpp +++ b/src/test.cpp @@ -1,19 +1,28 @@ #include "test.h" +#include +#include Test::Test() { QUrl con(QString("amqp://guest:guest@localhost:5672/")); client_ = new QAMQP::Client(this); client_->open(con); - exchange_ = client_->createExchange("test.test"); - exchange_->declare("direct"); + exchange_ = client_->createExchange("test.test2"); + exchange_->declare("fanout"); queue_ = client_->createQueue("test.my_queue", exchange_->channelNumber()); queue_->declare(); + queue2_ = client_->createQueue("test.my_queue2"); + queue2_->declare(); + exchange_->bind(queue_); + exchange_->bind(queue2_); connect(queue_, SIGNAL(declared()), this, SLOT(declared())); + connect(queue_, SIGNAL(messageRecieved()), this, SLOT(newMessage())); + + connect(queue2_, SIGNAL(messageRecieved()), this, SLOT(newMessage())); } Test::~Test() @@ -24,5 +33,32 @@ Test::~Test() void Test::declared() { qDebug("\t-= Ready =-"); - exchange_->publish("test 3432 432 24 23 423 32 23 4324 32 423 423 423", exchange_->name()); -} \ No newline at end of file + //queue_->purge(); + QFile f("D:/geoip.eap"); + f.open(QIODevice::ReadOnly); + exchange_->publish(f.readAll(), exchange_->name(), "image/jpg"); + //queue_->remove(true, false, false); + queue_->setQOS(0,10); + queue_->setConsumerTag("qamqp-consumer"); + queue_->consume(QAMQP::Queue::coNoAck); + + queue2_->setQOS(0,10); + queue2_->setConsumerTag("qamqp-consumer2"); + queue2_->consume(QAMQP::Queue::coNoAck); + //exchange_->remove(false, false); +} + +void Test::newMessage() +{ + QAMQP::Queue * q = qobject_cast(sender()); + while (q->hasMessage()) + { + QAMQP::MessagePtr message = q->getMessage(); + qDebug("+ RECEIVE MESSAGE"); + qDebug("| Exchange-name: %s", qPrintable(message->exchangeName)); + qDebug("| Routing-key: %s", qPrintable(message->routeKey)); + qDebug("| Content-type: %s", qPrintable(message->property[QAMQP::Frame::Content::cpContentType].toString())); + + } + +} diff --git a/src/test.h b/src/test.h index 667d388..27e768c 100644 --- a/src/test.h +++ b/src/test.h @@ -15,8 +15,10 @@ public: private slots: void declared(); + void newMessage(); + private: QPointer client_; QPointer exchange_; - QPointer queue_; + QPointer queue_, queue2_; }; \ No newline at end of file