added support for recovering

This commit is contained in:
Emiel Bruijntjes 2014-01-06 05:38:48 -08:00
parent 5a636f5b57
commit efc556ee0a
6 changed files with 77 additions and 6 deletions

View File

@ -264,10 +264,10 @@ channel.declareQueue("my-queue-name", AMQP::durable + AMQP::autodelete, argument
WORK IN PROGRESS
================
Almost all AMQP features have been implemented. We only need to add recover support
and support for returned messages. We also need to add more safety checks so that
strange data from RabbitMQ does not break the library (although in reality RabbitMQ
only sends valid data).
Almost all AMQP features have been implemented. We only need to add support for
returned messages. We also need to add more safety checks so that strange data
from RabbitMQ does not break the library (although in reality RabbitMQ only sends
valid data).
It would also be nice to have sample implementations for the ConnectionHandler
class that can be directly plugged into libev, libevent and libuv event loops.

View File

@ -380,7 +380,22 @@ public:
*/
bool reject(uint64_t deliveryTag, int flags=0) { return _implementation.reject(deliveryTag, flags); }
bool reject(const Message &message, int flags=0) { return _implementation.reject(message.deliveryTag(), flags); }
/**
* Recover all messages that were not yet acked
*
* This method asks the server to redeliver all unacknowledged messages on a specified
* channel. Zero or more messages may be redelivered.
*
* The following flags are supported:
*
* - requeue if set, the server will requeue the messages, so the could also end up with at different consumer
*
* @param flags
* @return bool
*/
bool recover(int flags = 0) { return _implementation.recover(flags); }
/**
* Close the current channel
* @return bool

View File

@ -175,7 +175,12 @@ public:
*/
virtual void onReceived(Channel *channel, const Message &message) {}
/**
* Method that is called when the server starts recovering messages
* This is the result of a call to Channel::recover()
* @param channel the channel on which the recover method was called
*/
virtual void onRecovering(Channel *channel) {}
};

View File

@ -276,6 +276,13 @@ public:
*/
bool reject(uint64_t deliveryTag, int flags);
/**
* Recover messages that were not yet ack'ed
* @param flags optional flags
* @return bool
*/
bool recover(int flags);
/**
* Close the current channel
* @return bool
@ -458,6 +465,14 @@ public:
*/
void reportReceived();
/**
* Report that the recover operation has started
*/
void reportRecovering()
{
if (_handler) _handler->onRecovering(_parent);
}
/**
* Create an incoming message
* @param frame

View File

@ -57,6 +57,27 @@ public:
{
return 111;
}
/**
* Process the frame
* @param connection The connection over which it was received
* @return bool Was it succesfully processed?
*/
virtual bool process(ConnectionImpl *connection) override
{
// we need the appropriate channel
ChannelImpl *channel = connection->channel(this->channel());
// channel does not exist
if (!channel) return false;
// report
channel->reportRecovering();
// done
return true;
}
};

View File

@ -32,6 +32,7 @@
#include "basiccancelframe.h"
#include "basicackframe.h"
#include "basicnackframe.h"
#include "basicrecoverframe.h"
/**
* Set up namespace
@ -511,6 +512,20 @@ bool ChannelImpl::reject(uint64_t deliveryTag, int flags)
return true;
}
/**
* Recover un-acked messages
* @param flags optional flags
* @return bool
*/
bool ChannelImpl::recover(int flags)
{
// send a nack frame
send(BasicRecoverFrame(_id, flags & requeue));
// done
return true;
}
/**
* Send a frame over the channel
* @param frame frame to send