diff --git a/README.md b/README.md new file mode 100644 index 0000000..abb4013 --- /dev/null +++ b/README.md @@ -0,0 +1,61 @@ +QAMQP +============= +Qt4 implementation of AMQP 0.9.1. + +Implement +------------ +### Connection +work with socket connections + +* start - start connection negotiation +* startok - select security mechanism and locale +* tune - propose connection tuning parameters +* tuneok - negotiate connection tuning parameters +* open - open connection to virtual host +* openok - signal that connection is ready +* close - request a connection close +* closeok - confirm a connection close + +### Channel +work with channels + +* open - open a channel for use +* openok - signal that the channel is ready +* close - request a channel close +* closeok - confirm a channel close + +### Exchange +work with exchanges + +* declare - verify exchange exists, create if needed +* declareok - confirm exchange declaration +* delete - delete an exchange +* deleteok - confirm deletion of an exchange + +### Queue +work with queues + +* declare - declare queue, create if needed +* declareok - confirms a queue definition +* bind - bind queue to an exchange +* bindok - confirm bind successful +* unbind - unbind a queue from an exchange +* unbindok - confirm unbind successful +* purge - purge a queue +* purgeok - confirms a queue purge +* delete - delete a queue +* deleteok - confirm deletion of a queue + +### Basic +work with basic content + +* qos - specify quality of service +* qosok - confirm the requested qos +* consume - start a queue consumer +* consumeok - confirm a new consumer +* publish - publish a message +* deliver - notify the client of a consumer message +* get - direct access to a queue +* getok - provide client with a message +* getempty - indicate no messages available +* ack - acknowledge one or more messages diff --git a/src/qamqp/amqp_channel_p.h b/src/qamqp/amqp_channel_p.h index c1f899e..c42dab3 100644 --- a/src/qamqp/amqp_channel_p.h +++ b/src/qamqp/amqp_channel_p.h @@ -36,7 +36,7 @@ namespace QAMQP bmReturn = 50, bmDeliver = 60, METHOD_ID_ENUM(bmGet, 70), - bmgetEmpty = 72, + bmGetEmpty = 72, bmAck = 80, bmReject = 90, bmRecoverAsync = 100, diff --git a/src/qamqp/amqp_exchange.h b/src/qamqp/amqp_exchange.h index e527a2e..1478a0a 100644 --- a/src/qamqp/amqp_exchange.h +++ b/src/qamqp/amqp_exchange.h @@ -54,6 +54,7 @@ namespace QAMQP Q_SIGNALS: void declared(); + void binded(); void removed(); }; } diff --git a/src/qamqp/amqp_message.h b/src/qamqp/amqp_message.h index 5266723..e71cb97 100644 --- a/src/qamqp/amqp_message.h +++ b/src/qamqp/amqp_message.h @@ -11,6 +11,7 @@ namespace QAMQP { qDebug("Message create"); leftSize = 0; + deliveryTag = 0; } ~Message() { @@ -19,6 +20,7 @@ namespace QAMQP typedef QAMQP::Frame::Content::Property MessageProperty; Q_DECLARE_FLAGS(MessageProperties, MessageProperty); + qlonglong deliveryTag; QByteArray payload; QHash property; QAMQP::Frame::TableField headers; diff --git a/src/qamqp/amqp_queue.cpp b/src/qamqp/amqp_queue.cpp index be1f6b3..7be721f 100644 --- a/src/qamqp/amqp_queue.cpp +++ b/src/qamqp/amqp_queue.cpp @@ -73,7 +73,15 @@ Queue::QueueOptions Queue::option() const return d_func()->options; } +void Queue::setNoAck( bool noAck ) +{ + d_func()->noAck = noAck; +} +bool Queue::noAck() const +{ + return d_func()->noAck; +} void Queue::declare() { @@ -153,6 +161,18 @@ QString Queue::consumerTag() const return d_func()->consumerTag; } + +void Queue::get() +{ + d_func()->get(); +} + + +void Queue::ack( const MessagePtr & message ) +{ + d_func()->ack(message); +} + ////////////////////////////////////////////////////////////////////////// @@ -161,6 +181,7 @@ QueuePrivate::QueuePrivate() , deleyedDeclare(false) , declared(false) , recievingMessage(false) + , noAck(true) { } @@ -211,6 +232,12 @@ bool QueuePrivate::_q_method( const QAMQP::Frame::Method & frame ) case bmDeliver: deliver(frame); break; + case bmGetOk: + getOk(frame); + break; + case bmGetEmpty: + QMetaObject::invokeMethod(q_func(), "empty"); + break; default: break; } @@ -382,6 +409,61 @@ void QueuePrivate::unbind( const QString & exchangeName, const QString & key ) } +void QueuePrivate::get() +{ + if(!opened) + { + return; + } + + QAMQP::Frame::Method frame(QAMQP::Frame::fcBasic, bmGet); + frame.setChannel(number); + QByteArray arguments_; + QDataStream out(&arguments_, QIODevice::WriteOnly); + out << qint16(0); //reserver 1 + writeField('s', out, name); + out << qint8(noAck ? 1 : 0); // noAck + + frame.setArguments(arguments_); + sendFrame(frame); +} + + +void QueuePrivate::getOk( const QAMQP::Frame::Method & frame ) +{ + QByteArray data = frame.arguments(); + QDataStream in(&data, QIODevice::ReadOnly); + + qlonglong deliveryTag = readField('L',in).toLongLong(); + bool redelivered = readField('t',in).toBool(); + QString exchangeName = readField('s',in).toString(); + QString routingKey = readField('s',in).toString(); + + MessagePtr newMessage = MessagePtr(new Message); + newMessage->routeKey = routingKey; + newMessage->exchangeName = exchangeName; + newMessage->deliveryTag = deliveryTag; + messages_.enqueue(newMessage); +} + + +void QueuePrivate::ack( const MessagePtr & Message ) +{ + if(!opened) + { + return; + } + + QAMQP::Frame::Method frame(QAMQP::Frame::fcBasic, bmAck); + frame.setChannel(number); + QByteArray arguments_; + QDataStream out(&arguments_, QIODevice::WriteOnly); + out << Message->deliveryTag; //reserver 1 + out << qint8(0); // noAck + + frame.setArguments(arguments_); + sendFrame(frame); +} void QueuePrivate::consume( Queue::ConsumeOptions options ) { @@ -421,7 +503,6 @@ void QueuePrivate::consumeOk( const QAMQP::Frame::Method & frame ) void QueuePrivate::deliver( const QAMQP::Frame::Method & frame ) { - declared = false; QByteArray data = frame.arguments(); QDataStream in(&data, QIODevice::ReadOnly); @@ -439,6 +520,7 @@ void QueuePrivate::deliver( const QAMQP::Frame::Method & frame ) MessagePtr newMessage = MessagePtr(new Message); newMessage->routeKey = routingKey; newMessage->exchangeName = exchangeName; + newMessage->deliveryTag = deliveryTag; messages_.enqueue(newMessage); } diff --git a/src/qamqp/amqp_queue.h b/src/qamqp/amqp_queue.h index a537a59..cacff6b 100644 --- a/src/qamqp/amqp_queue.h +++ b/src/qamqp/amqp_queue.h @@ -17,6 +17,7 @@ namespace QAMQP Q_PROPERTY(QueueOptions option READ option ); Q_PROPERTY(QString consumerTag READ consumerTag WRITE setConsumerTag) + Q_PROPERTY(bool noAck READ noAck WRITE setNoAck) Q_DECLARE_PRIVATE(QAMQP::Queue) Q_DISABLE_COPY(Queue); @@ -62,16 +63,22 @@ namespace QAMQP void unbind(Exchange * exchange, const QString & key); MessagePtr getMessage(); + void get(); + void ack(const MessagePtr & message); bool hasMessage() const; void consume(ConsumeOptions options = ConsumeOptions(NoOptions)); void setConsumerTag(const QString &consumerTag); QString consumerTag() const; + + void setNoAck(bool noAck); + bool noAck() const; Q_SIGNALS: void declared(); void binded(bool); void removed(); void messageRecieved(); + void empty(); private: Q_PRIVATE_SLOT(d_func(), void _q_content(const QAMQP::Frame::Content & frame)) diff --git a/src/qamqp/amqp_queue_p.h b/src/qamqp/amqp_queue_p.h index d1f1a84..cb944c5 100644 --- a/src/qamqp/amqp_queue_p.h +++ b/src/qamqp/amqp_queue_p.h @@ -41,9 +41,12 @@ namespace QAMQP /************************************************************************/ void consume(Queue::ConsumeOptions options); - void consumeOk(const QAMQP::Frame::Method & frame); - + void consumeOk(const QAMQP::Frame::Method & frame); void deliver(const QAMQP::Frame::Method & frame); + + void get(); + void getOk(const QAMQP::Frame::Method & frame); + void ack(const MessagePtr & Message); QString type; Queue::QueueOptions options; @@ -52,6 +55,7 @@ namespace QAMQP bool deleyedDeclare; bool declared; + bool noAck; QString consumerTag; QMap delayedBindings;