From d1be1063a53c77fa2798a592b3e8bc0be556c2ca Mon Sep 17 00:00:00 2001 From: Emiel Bruijntjes Date: Wed, 23 Nov 2022 22:09:15 +0100 Subject: [PATCH] implemented "consumer cancel notifications" (see https://www.rabbitmq.com/consumer-cancel.html) --- examples/libev.cpp | 33 +++++++++++++++++++++++++---- include/amqpcpp/channelimpl.h | 9 +++++++- include/amqpcpp/deferredconsumer.h | 34 +++++++++++++++++++++++++++++- src/basiccancelframe.h | 22 ++++++++++++++++++- src/channelimpl.cpp | 25 +++++++++++++++++++++- src/connectionstartframe.h | 6 +++++- 6 files changed, 120 insertions(+), 9 deletions(-) diff --git a/examples/libev.cpp b/examples/libev.cpp index cf9a78d..555387d 100644 --- a/examples/libev.cpp +++ b/examples/libev.cpp @@ -4,7 +4,7 @@ * Test program to check AMQP functionality based on LibEV * * @author Emiel Bruijntjes - * @copyright 2015 - 2018 Copernica BV + * @copyright 2015 - 2022 Copernica BV */ /** @@ -180,10 +180,10 @@ int main() AMQP::TcpChannel channel(&connection); // create a temporary queue - channel.declareQueue(AMQP::exclusive).onSuccess([&connection, &channel, loop](const std::string &name, uint32_t messagecount, uint32_t consumercount) { + channel.declareQueue(AMQP::exclusive).onSuccess([&connection, &channel, loop](const std::string &queuename, uint32_t messagecount, uint32_t consumercount) { // report the name of the temporary queue - std::cout << "declared queue " << name << std::endl; + std::cout << "declared queue " << queuename << std::endl; // close the channel //channel.close().onSuccess([&connection, &channel]() { @@ -196,7 +196,32 @@ int main() //}); // construct a timer that is going to publish stuff - auto *timer = new MyTimer(loop, &channel, name); + auto *timer = new MyTimer(loop, &channel, queuename); + + // start a consumer + channel.consume(queuename).onSuccess([](const std::string &tag) { + + // the consumer is ready + std::cout << "started consuming with tag " << tag << std::endl; + + }).onCancelled([](const std::string &tag) { + + // the consumer was cancelled by the server + std::cout << "consumer " << tag << " was cancelled" << std::endl; + + }).onReceived([&channel, queuename](const AMQP::Message &message, uint64_t deliveryTag, bool redelivered) { + + std::cout << "received " << deliveryTag << std::endl; + + // we remove the queue -- to see if this indeed causes the onCancelled method to be called + if (deliveryTag > 3) channel.removeQueue(queuename); + + // ack the message + channel.ack(deliveryTag); + + }); + + //connection.close(); }); diff --git a/include/amqpcpp/channelimpl.h b/include/amqpcpp/channelimpl.h index c487808..e6dea16 100644 --- a/include/amqpcpp/channelimpl.h +++ b/include/amqpcpp/channelimpl.h @@ -5,7 +5,7 @@ * that has a private constructor so that it can not be used from outside * the AMQP library * - * @copyright 2014 - 2020 Copernica BV + * @copyright 2014 - 2022 Copernica BV */ /** @@ -712,6 +712,13 @@ public: return true; } + /** + * Report that a consumer was cancelled by the server (for example because the + * queue was removed or the node on which the queue was stored was terminated) + * @param tag the consumer tag + */ + void reportCancelled(const std::string &tag); + /** * Report an error message on a channel * @param message the error message diff --git a/include/amqpcpp/deferredconsumer.h b/include/amqpcpp/deferredconsumer.h index 40626c0..4fc1439 100644 --- a/include/amqpcpp/deferredconsumer.h +++ b/include/amqpcpp/deferredconsumer.h @@ -3,7 +3,7 @@ * * Deferred callback for consumers * - * @copyright 2014 - 2018 Copernica BV + * @copyright 2014 - 2022 Copernica BV */ /** @@ -38,6 +38,12 @@ private: */ ConsumeCallback _consumeCallback; + /** + * Callback to excute when the server has cancelled the consumer + * @var CancelCallback + */ + CancelCallback _cancelCallback; + /** * Process a delivery frame * @@ -52,6 +58,16 @@ private: */ virtual const std::shared_ptr &reportSuccess(const std::string &name) override; + /** + * Report that the server has cancelled this consumer + * @param namae The consumer tag + */ + void reportCancelled(const std::string &name) + { + // report + if (_cancelCallback) _cancelCallback(name); + } + /** * Get reference to self to prevent that object falls out of scope * @return std::shared_ptr @@ -283,6 +299,22 @@ public: // allow chaining return *this; } + + /** + * Register a funtion to be called when the server cancelled the consumer + * + * @param callback The callback to invoke + * @return Same object for chaining + */ + inline DeferredConsumer &onCancelled(const CancelCallback& callback) { return onCancelled(CancelCallback(callback)); } + DeferredConsumer &onCancelled(CancelCallback&& callback) + { + // store callback + _cancelCallback = std::move(callback); + + // allow chaining + return *this; + } }; /** diff --git a/src/basiccancelframe.h b/src/basiccancelframe.h index cdcf916..9e82caa 100644 --- a/src/basiccancelframe.h +++ b/src/basiccancelframe.h @@ -1,7 +1,7 @@ /** * Class describing a basic cancel frame * - * @copyright 2014 Copernica BV + * @copyright 2014 - 2022 Copernica BV */ /** @@ -106,6 +106,26 @@ public: { return _noWait.get(0); } + + /** + * Process the frame + * @param connection The connection over which it was received + * @return bool Was it succesfully processed? + */ + virtual bool process(ConnectionImpl *connection) override + { + // we need the appropriate channel + auto channel = connection->channel(this->channel()); + + // channel does not exist + if (!channel) return false; + + // report + channel->reportCancelled(consumerTag()); + + // done + return true; + } }; /** diff --git a/src/channelimpl.cpp b/src/channelimpl.cpp index 086da31..32417cd 100644 --- a/src/channelimpl.cpp +++ b/src/channelimpl.cpp @@ -3,7 +3,7 @@ * * Implementation for a channel * - * @copyright 2014 - 2020 Copernica BV + * @copyright 2014 - 2022 Copernica BV */ #include "includes.h" #include "basicgetokframe.h" @@ -946,6 +946,29 @@ void ChannelImpl::reportError(const char *message, bool notifyhandler) _connection = nullptr; } +/** + * Report that a consumer was cancelled by the server (for example because the + * queue was removed or the node on which the queue was stored was terminated) + * @param tag the consumer tag + */ +void ChannelImpl::reportCancelled(const std::string &tag) +{ + // look in the map + auto iter = _consumers.find(tag); + + // check if there is not even such a consumer + if (iter == _consumers.end()) return; + + // the actual consumer (this is a shared pointer) + auto consumer = iter->second; + + // remove from the map + _consumers.erase(iter); + + // report that the consumer was cancelled + consumer->reportCancelled(tag); +} + /** * Get the current receiver for a given consumer tag * @param consumertag the consumer frame diff --git a/src/connectionstartframe.h b/src/connectionstartframe.h index 2d3e061..a203046 100644 --- a/src/connectionstartframe.h +++ b/src/connectionstartframe.h @@ -5,7 +5,7 @@ * is opened. It contains the initial connection properties, and the protocol * number. * - * @copyright 2014 Copernica BV + * @copyright 2014 - 2022 Copernica BV */ /** @@ -195,6 +195,10 @@ public: // we want a special treatment for authentication failures capabilities["authentication_failure_close"] = true; + // when the server cancels a consumer (for example because a queue is delete, or the node on which the + // queue lives dies, we want to receive a notification that the consumer is no longer alive) + capabilities["consumer_cancel_notify"] = true; + // fill the peer properties if (!properties.contains("product")) properties["product"] = "Copernica AMQP library"; if (!properties.contains("version")) properties["version"] = "Unknown";