implemented connection.blocked handling and callbacks

This commit is contained in:
Timo Sluis 2023-07-03 11:39:06 +02:00
parent 12f4314d2a
commit 4fb968f7a4
6 changed files with 247 additions and 1 deletions

View File

@ -192,6 +192,38 @@ public:
// make sure compilers dont complain about unused parameters
(void) connection;
}
/**
* Method that is called when the AMQP connection was blocked.
*
* This method is called, when the server connection gets blocked for the first
* time due to the broker running low on a resource (memory or disk). For
* example, when a RabbitMQ node detects that it is low on RAM, it sends a
* notification to all connected publishing clients supporting this feature.
* If before the connections are unblocked the node also starts running low on
* disk space, another notification will not be sent.
*
* @param connection The connection that was blocked
*/
virtual void onBlocked(Connection *connection)
{
// make sure compilers dont complain about unused parameters
(void) connection;
}
/**
* Method that is called when the AMQP connection is no longer blocked.
*
* This method is called when all resource alarms have cleared and the
* connection is fully unblocked.
*
* @param connection The connection that is no longer blocked
*/
virtual void onUnblocked(Connection *connection)
{
// make sure compilers dont complain about unused parameters
(void) connection;
}
};
/**

View File

@ -66,7 +66,7 @@ protected:
state_handshake, // busy with the handshake to open the connection
state_connected, // connection is set up and ready for communication
state_closing, // connection is busy closing (we have sent the close frame)
state_closed // connection is closed
state_closed, // connection is closed
} _state = state_protocol;
/**
@ -428,6 +428,24 @@ public:
_handler->onClosed(_parent);
}
/**
* Report that the connection is blocked
*/
void reportBlocked()
{
// inform the handler
_handler->onBlocked(_parent);
}
/**
* Report that the connection is unblocked
*/
void reportUnblocked()
{
// inform the handler
_handler->onUnblocked(_parent);
}
/**
* Retrieve the amount of channels this connection has
* @return std::size_t

108
src/connectionblockframe.h Normal file
View File

@ -0,0 +1,108 @@
/**
* Class describing a connection blocked frame
*
* This frame is sent by the server to the client, when their connection gets
* blocked for the first time due to the broker running low on a resource
* (memory or disk). For example, when a RabbitMQ node detects that it is low
* on RAM, it sends a notification to all connected publishing clients
* supporting this feature. If before the connections are unblocked the node
* also starts running low on disk space, another notification will not be sent.
*
* @copyright 2023 Copernica BV
*/
/**
* Set up namespace
*/
namespace AMQP {
/**
* Class implementation
*/
class ConnectionBlockFrame : public ConnectionFrame
{
private:
/**
* The reason for blocking
* @var ShortString
*/
ShortString _reason;
protected:
/**
* Encode a frame on a string buffer
*
* @param buffer buffer to write frame to
*/
virtual void fill(OutBuffer& buffer) const override
{
// call base
ConnectionFrame::fill(buffer);
// encode the field
_reason.fill(buffer);
}
public:
/**
* Construct a connection blocked frame from a received frame
*
* @param frame received frame
*/
ConnectionBlockFrame(ReceivedFrame &frame) :
ConnectionFrame(frame),
_reason(frame)
{}
/**
* Construct a connection blocked frame
*
* @param reason the reason for blocking
*/
ConnectionBlockFrame(uint16_t code, std::string reason) :
ConnectionFrame((uint32_t)(reason.length() + 1)), // 1 for extra string byte
_reason(std::move(reason))
{}
/**
* Destructor
*/
virtual ~ConnectionBlockFrame() {}
/**
* Method id
* @return uint16_t
*/
virtual uint16_t methodID() const override
{
return 60;
}
/**
* Get the reason for blocking
* @return string
*/
const std::string& reason() const
{
return _reason;
}
/**
* Process the frame
* @param connection
*/
virtual bool process(ConnectionImpl *connection) override
{
// report that it is blocked
connection->reportBlocked();
// done
return true;
}
};
/**
* end namespace
*/
}

View File

@ -219,6 +219,9 @@ public:
// queue lives dies, we want to receive a notification that the consumer is no longer alive)
capabilities["consumer_cancel_notify"] = true;
// when the rabbitmq server reaches its max capacity, it can send a notification to us, we want them
capabilities["connection.blocked"] = true;
// fill the peer properties
if (!properties.contains("version")) properties["version"] = "AMQP-CPP " VERSION_NAME;
if (!properties.contains("copyright")) properties["copyright"] = "Copernica AMQP-CPP library :: Copyright 2015-2023 Copernica BV";

View File

@ -0,0 +1,81 @@
/**
* Class describing a connection unblocked frame
*
* This frame is sent by the server to the client, when all resource alarms
* have cleared and the connection is fully unblocked.
*
* @copyright 2023 Copernica BV
*/
/**
* Set up namespace
*/
namespace AMQP {
/**
* Class implementation
*/
class ConnectionUnblockFrame : public ConnectionFrame
{
protected:
/**
* Encode a frame on a string buffer
*
* @param buffer buffer to write frame to
*/
virtual void fill(OutBuffer& buffer) const override
{
// call base
ConnectionFrame::fill(buffer);
}
public:
/**
* Construct a connection unblocked frame from a received frame
*
* @param frame received frame
*/
ConnectionUnblockFrame(ReceivedFrame &frame) :
ConnectionFrame(frame)
{}
/**
* Construct a connection unblocked frame
*/
ConnectionUnblockFrame(uint16_t code, std::string reason) :
ConnectionFrame(0)
{}
/**
* Destructor
*/
virtual ~ConnectionUnblockFrame() {}
/**
* Method id
* @return uint16_t
*/
virtual uint16_t methodID() const override
{
return 61;
}
/**
* Process the frame
* @param connection
*/
virtual bool process(ConnectionImpl *connection) override
{
// report that it is no longer blocked
connection->reportUnblocked();
// done
return true;
}
};
/**
* end namespace
*/
}

View File

@ -19,6 +19,8 @@
#include "connectiontuneframe.h"
#include "connectioncloseokframe.h"
#include "connectioncloseframe.h"
#include "connectionblockframe.h"
#include "connectionunblockframe.h"
#include "channelopenframe.h"
#include "channelopenokframe.h"
#include "channelflowframe.h"
@ -195,6 +197,8 @@ bool ReceivedFrame::processConnectionFrame(ConnectionImpl *connection)
case 41: return ConnectionOpenOKFrame(*this).process(connection);
case 50: return ConnectionCloseFrame(*this).process(connection);
case 51: return ConnectionCloseOKFrame(*this).process(connection);
case 60: return ConnectionBlockFrame(*this).process(connection);
case 61: return ConnectionUnblockFrame(*this).process(connection);
}
// this is a problem