implemented "consumer cancel notifications" (see https://www.rabbitmq.com/consumer-cancel.html)

This commit is contained in:
Emiel Bruijntjes 2022-11-23 22:09:15 +01:00
parent 3e91687a9d
commit d1be1063a5
6 changed files with 120 additions and 9 deletions

View File

@ -4,7 +4,7 @@
* Test program to check AMQP functionality based on LibEV
*
* @author Emiel Bruijntjes <emiel.bruijntjes@copernica.com>
* @copyright 2015 - 2018 Copernica BV
* @copyright 2015 - 2022 Copernica BV
*/
/**
@ -180,10 +180,10 @@ int main()
AMQP::TcpChannel channel(&connection);
// create a temporary queue
channel.declareQueue(AMQP::exclusive).onSuccess([&connection, &channel, loop](const std::string &name, uint32_t messagecount, uint32_t consumercount) {
channel.declareQueue(AMQP::exclusive).onSuccess([&connection, &channel, loop](const std::string &queuename, uint32_t messagecount, uint32_t consumercount) {
// report the name of the temporary queue
std::cout << "declared queue " << name << std::endl;
std::cout << "declared queue " << queuename << std::endl;
// close the channel
//channel.close().onSuccess([&connection, &channel]() {
@ -196,7 +196,32 @@ int main()
//});
// construct a timer that is going to publish stuff
auto *timer = new MyTimer(loop, &channel, name);
auto *timer = new MyTimer(loop, &channel, queuename);
// start a consumer
channel.consume(queuename).onSuccess([](const std::string &tag) {
// the consumer is ready
std::cout << "started consuming with tag " << tag << std::endl;
}).onCancelled([](const std::string &tag) {
// the consumer was cancelled by the server
std::cout << "consumer " << tag << " was cancelled" << std::endl;
}).onReceived([&channel, queuename](const AMQP::Message &message, uint64_t deliveryTag, bool redelivered) {
std::cout << "received " << deliveryTag << std::endl;
// we remove the queue -- to see if this indeed causes the onCancelled method to be called
if (deliveryTag > 3) channel.removeQueue(queuename);
// ack the message
channel.ack(deliveryTag);
});
//connection.close();
});

View File

@ -5,7 +5,7 @@
* that has a private constructor so that it can not be used from outside
* the AMQP library
*
* @copyright 2014 - 2020 Copernica BV
* @copyright 2014 - 2022 Copernica BV
*/
/**
@ -712,6 +712,13 @@ public:
return true;
}
/**
* Report that a consumer was cancelled by the server (for example because the
* queue was removed or the node on which the queue was stored was terminated)
* @param tag the consumer tag
*/
void reportCancelled(const std::string &tag);
/**
* Report an error message on a channel
* @param message the error message

View File

@ -3,7 +3,7 @@
*
* Deferred callback for consumers
*
* @copyright 2014 - 2018 Copernica BV
* @copyright 2014 - 2022 Copernica BV
*/
/**
@ -38,6 +38,12 @@ private:
*/
ConsumeCallback _consumeCallback;
/**
* Callback to excute when the server has cancelled the consumer
* @var CancelCallback
*/
CancelCallback _cancelCallback;
/**
* Process a delivery frame
*
@ -52,6 +58,16 @@ private:
*/
virtual const std::shared_ptr<Deferred> &reportSuccess(const std::string &name) override;
/**
* Report that the server has cancelled this consumer
* @param namae The consumer tag
*/
void reportCancelled(const std::string &name)
{
// report
if (_cancelCallback) _cancelCallback(name);
}
/**
* Get reference to self to prevent that object falls out of scope
* @return std::shared_ptr
@ -283,6 +299,22 @@ public:
// allow chaining
return *this;
}
/**
* Register a funtion to be called when the server cancelled the consumer
*
* @param callback The callback to invoke
* @return Same object for chaining
*/
inline DeferredConsumer &onCancelled(const CancelCallback& callback) { return onCancelled(CancelCallback(callback)); }
DeferredConsumer &onCancelled(CancelCallback&& callback)
{
// store callback
_cancelCallback = std::move(callback);
// allow chaining
return *this;
}
};
/**

View File

@ -1,7 +1,7 @@
/**
* Class describing a basic cancel frame
*
* @copyright 2014 Copernica BV
* @copyright 2014 - 2022 Copernica BV
*/
/**
@ -106,6 +106,26 @@ public:
{
return _noWait.get(0);
}
/**
* Process the frame
* @param connection The connection over which it was received
* @return bool Was it succesfully processed?
*/
virtual bool process(ConnectionImpl *connection) override
{
// we need the appropriate channel
auto channel = connection->channel(this->channel());
// channel does not exist
if (!channel) return false;
// report
channel->reportCancelled(consumerTag());
// done
return true;
}
};
/**

View File

@ -3,7 +3,7 @@
*
* Implementation for a channel
*
* @copyright 2014 - 2020 Copernica BV
* @copyright 2014 - 2022 Copernica BV
*/
#include "includes.h"
#include "basicgetokframe.h"
@ -946,6 +946,29 @@ void ChannelImpl::reportError(const char *message, bool notifyhandler)
_connection = nullptr;
}
/**
* Report that a consumer was cancelled by the server (for example because the
* queue was removed or the node on which the queue was stored was terminated)
* @param tag the consumer tag
*/
void ChannelImpl::reportCancelled(const std::string &tag)
{
// look in the map
auto iter = _consumers.find(tag);
// check if there is not even such a consumer
if (iter == _consumers.end()) return;
// the actual consumer (this is a shared pointer)
auto consumer = iter->second;
// remove from the map
_consumers.erase(iter);
// report that the consumer was cancelled
consumer->reportCancelled(tag);
}
/**
* Get the current receiver for a given consumer tag
* @param consumertag the consumer frame

View File

@ -5,7 +5,7 @@
* is opened. It contains the initial connection properties, and the protocol
* number.
*
* @copyright 2014 Copernica BV
* @copyright 2014 - 2022 Copernica BV
*/
/**
@ -195,6 +195,10 @@ public:
// we want a special treatment for authentication failures
capabilities["authentication_failure_close"] = true;
// when the server cancels a consumer (for example because a queue is delete, or the node on which the
// queue lives dies, we want to receive a notification that the consumer is no longer alive)
capabilities["consumer_cancel_notify"] = true;
// fill the peer properties
if (!properties.contains("product")) properties["product"] = "Copernica AMQP library";
if (!properties.contains("version")) properties["version"] = "Unknown";