From dddd16b74d1cb67eaf940a1adec314e4361444d8 Mon Sep 17 00:00:00 2001 From: David van Erkelens Date: Tue, 5 Jul 2016 16:39:12 +0200 Subject: [PATCH] Make sure single gets are processed correctly as well --- include/channelimpl.h | 6 +++++- include/deferredconsumerbase.h | 9 +++++++++ src/basicgetokframe.h | 7 +++++-- src/deferredconsumerbase.cpp | 19 +++++++++++++++++++ src/deferredget.cpp | 2 +- 5 files changed, 39 insertions(+), 4 deletions(-) diff --git a/include/channelimpl.h b/include/channelimpl.h index ec1f365..c00ef00 100644 --- a/include/channelimpl.h +++ b/include/channelimpl.h @@ -663,11 +663,15 @@ public: * * @param consumertag The consumer tag * @param consumer The consumer handler + * @param active Is this the new active consumer */ - void install(std::string consumertag, const std::shared_ptr &consumer) + void install(std::string consumertag, const std::shared_ptr &consumer, bool active = false) { // install the consumer handler _consumers[consumertag] = consumer; + + // should we become the current consumer? + if (active) _consumer = consumer; } /** diff --git a/include/deferredconsumerbase.h b/include/deferredconsumerbase.h index 7a15e2d..4419482 100644 --- a/include/deferredconsumerbase.h +++ b/include/deferredconsumerbase.h @@ -28,6 +28,7 @@ namespace AMQP { * Forward declarations */ class BasicDeliverFrame; +class BasicGetOKFrame; class BasicHeaderFrame; class BodyFrame; @@ -52,6 +53,13 @@ private: */ void process(BasicDeliverFrame &frame); + /** + * Process a delivery frame from a get request + * + * @param frame The frame to process + */ + void process(BasicGetOKFrame &frame); + /** * Process the message headers * @@ -85,6 +93,7 @@ private: */ friend class ChannelImpl; friend class BasicDeliverFrame; + friend class BasicGetOKFrame; friend class BasicHeaderFrame; friend class BodyFrame; protected: diff --git a/src/basicgetokframe.h b/src/basicgetokframe.h index be4189f..5cc3a12 100644 --- a/src/basicgetokframe.h +++ b/src/basicgetokframe.h @@ -173,8 +173,11 @@ public: // report success for the get operation channel->reportSuccess(messageCount(), deliveryTag(), redelivered()); - // notice that the channel is not yet synchronized here, because - // we first have to receive the entire body + // check if we have a valid consumer + if (!channel->consumer()) return false; + + // pass on to consumer + channel->consumer()->process(*this); // done return true; diff --git a/src/deferredconsumerbase.cpp b/src/deferredconsumerbase.cpp index fd85f14..d9ab77f 100644 --- a/src/deferredconsumerbase.cpp +++ b/src/deferredconsumerbase.cpp @@ -12,6 +12,7 @@ */ #include "../include/deferredconsumerbase.h" #include "basicdeliverframe.h" +#include "basicgetokframe.h" #include "basicheaderframe.h" #include "bodyframe.h" @@ -38,6 +39,24 @@ void DeferredConsumerBase::process(BasicDeliverFrame &frame) if (_messageCallback) _message.construct(frame.exchange(), frame.routingKey()); } +/** + * Process a delivery frame from a get request + * + * @param frame The frame to process + */ +void DeferredConsumerBase::process(BasicGetOKFrame &frame) +{ + // retrieve the delivery tag and whether we were redelivered + _deliveryTag = frame.deliveryTag(); + _redelivered = frame.redelivered(); + + // anybody interested in the new message? + if (_beginCallback) _beginCallback(); + + // do we have anybody interested in messages? + if (_messageCallback) _message.construct(frame.exchange(), frame.routingKey()); +} + /** * Process the message headers * diff --git a/src/deferredget.cpp b/src/deferredget.cpp index a8b4a20..558c202 100644 --- a/src/deferredget.cpp +++ b/src/deferredget.cpp @@ -31,7 +31,7 @@ const std::shared_ptr &DeferredGet::reportSuccess(uint32_t messagecoun _redelivered = redelivered; // install ourselves in the channel - _channel->install("", shared_from_this()); + _channel->install("", shared_from_this(), true); // report the size (note that this is the size _minus_ the message that is retrieved // (and for which the callback will be called later), so it could be zero)