added ConnectionHandler::onProperties and TcpConnectionHandler::onProperties

This commit is contained in:
Emiel Bruijntjes 2018-11-07 23:22:25 +01:00
parent f0ad90dd65
commit 06dc23190d
7 changed files with 128 additions and 26 deletions

View File

@ -41,16 +41,49 @@ public:
*/ */
virtual ~ConnectionHandler() = default; virtual ~ConnectionHandler() = default;
/**
* When the connection is being set up, the client and server exchange
* some information. This includes for example their name and version,
* copyright statement and the operating system name. Nothing in this
* exchange of information is very relevant for the actual AMQP protocol,
* but by overriding this method you can read out the information that
* was sent by the server, and you can decide which information you
* want to send back that describe the client. In RabbitMQ's management
* console the client-properties are visible on the "connections" tab,
* which could be helpful in certain scenarios, like debugging.
*
* The read-only "server" parameter contains the information sent by
* the server, while the "client" table may be filled with information
* about your application. The AMQP protocol says that this table should
* at least be filled with data for the "product", "version", "platform",
* "copyright" and "information" keys. However, you do not have to
* override this method, and even when you do, you do not have to ensure
* that these properties are indeed set, because the AMQP-CPP library
* takes care of filling in properties that were not explicitly set.
*
* @param connection The connection about which information is exchanged
* @param server Properties sent by the server
* @param client Properties that are to be sent back
*/
virtual void onProperties(Connection *connection, const Table &server, Table &client)
{
// make sure compilers dont complaint about unused parameters
(void) connection;
(void) server;
(void) client;
}
/** /**
* Method that is called when the heartbeat frequency is negotiated * Method that is called when the heartbeat frequency is negotiated
* between the server and the client durion connection setup. You * between the server and the client durion connection setup. You
* normally do not have to override this method, because in the default * normally do not have to override this method, because in the default
* implementation the suggested heartbeat is simply accepted by the client. * implementation the suggested heartbeat is simply rejected by the client.
* *
* However, if you want to disable heartbeats, or when you want an * However, if you want to enable heartbeats you can override this
* alternative heartbeat interval, you can override this method * method. You should "return interval" if you want to accept the
* to use an other interval. You should return 0 if you want to * heartbeat interval that was suggested by the server, or you can
* disable heartbeats. * return an alternative value if you want a shorter or longer interval.
* Return 0 if you want to disable heartbeats.
* *
* If heartbeats are enabled, you yourself are responsible to send * If heartbeats are enabled, you yourself are responsible to send
* out a heartbeat every *interval* number of seconds by calling * out a heartbeat every *interval* number of seconds by calling
@ -71,7 +104,9 @@ public:
} }
/** /**
* Method that is called when data needs to be sent over the network * Method that is called by AMQP-CPP when data has to be sent over the
* network. You must implement this method and send the data over a
* socket that is connected with RabbitMQ.
* *
* Note that the AMQP library does no buffering by itself. This means * Note that the AMQP library does no buffering by itself. This means
* that this method should always send out all data or do the buffering * that this method should always send out all data or do the buffering
@ -84,7 +119,8 @@ public:
virtual void onData(Connection *connection, const char *buffer, size_t size) = 0; virtual void onData(Connection *connection, const char *buffer, size_t size) = 0;
/** /**
* Method that is called when the server sends a heartbeat to the client * Method that is called when the AMQP-CPP library received a heartbeat
* frame that was sent by the server to the client.
* *
* You do not have to do anything here, the client sends back a heartbeat * You do not have to do anything here, the client sends back a heartbeat
* frame automatically, but if you like, you can implement/override this * frame automatically, but if you like, you can implement/override this
@ -100,7 +136,8 @@ public:
/** /**
* When the connection ends up in an error state this method is called. * When the connection ends up in an error state this method is called.
* This happens when data comes in that does not match the AMQP protocol * This happens when data comes in that does not match the AMQP protocol,
* or when an error message was sent by the server to the client.
* *
* After this method is called, the connection no longer is in a valid * After this method is called, the connection no longer is in a valid
* state and can no longer be used. * state and can no longer be used.
@ -126,10 +163,11 @@ public:
* *
* According to the AMQP protocol, you must wait for the connection to become * According to the AMQP protocol, you must wait for the connection to become
* ready (and this onConnected method to be called) before you can start * ready (and this onConnected method to be called) before you can start
* using the Connection object. However, this AMQP library will cache all * sending instructions to RabbitMQ. However, if you prematurely do send
* methods that you call before the connection is ready, so in reality there * instructions, this AMQP-CPP library caches all methods that you call
* is no real reason to wait for this method to be called before you send * before the connection is ready and flushes them the moment the connection
* the first instructions. * has been set up, so technically there is no real reason to wait for this
* method to be called before you send the first instructions.
* *
* @param connection The connection that can now be used * @param connection The connection that can now be used
*/ */
@ -143,7 +181,7 @@ public:
* Method that is called when the connection was closed. * Method that is called when the connection was closed.
* *
* This is the counter part of a call to Connection::close() and it confirms * This is the counter part of a call to Connection::close() and it confirms
* that the connection was correctly closed. * that the connection was _correctly_ closed.
* *
* @param connection The connection that was closed and that is now unusable * @param connection The connection that was closed and that is now unusable
*/ */

View File

@ -205,9 +205,20 @@ public:
/** /**
* Mark the protocol as being ok * Mark the protocol as being ok
* @param server properties sent by the server
* @param client properties to be send back
*/ */
void setProtocolOk() void setProtocolOk(const Table &server, Table &client)
{ {
// if object is destructed
Monitor monitor(this);
// check if user-space wants to set these properties
_handler->onProperties(_parent, server, client);
// leap out if userspace destructed the object
if (!monitor.valid()) return;
// move on to handshake state // move on to handshake state
if (_state == state_protocol) _state = state_handshake; if (_state == state_protocol) _state = state_handshake;
} }

View File

@ -59,6 +59,15 @@ private:
friend TcpChannel; friend TcpChannel;
/**
* Method that is called when the RabbitMQ server and your client application
* exchange some properties that describe their identity.
* @param connection The connection about which information is exchanged
* @param server Properties sent by the server
* @param client Properties that are to be sent back
*/
virtual void onProperties(Connection *connection, const Table &server, Table &client) override;
/** /**
* Method that is called when the heartbeat frequency is negotiated. * Method that is called when the heartbeat frequency is negotiated.
* @param connection The connection that suggested a heartbeat interval * @param connection The connection that suggested a heartbeat interval

View File

@ -85,6 +85,21 @@ public:
return true; return true;
} }
/**
* Method that is called when the RabbitMQ server and your client application
* exchange some properties that describe their identity.
* @param connection The connection about which information is exchanged
* @param server Properties sent by the server
* @param client Properties that are to be sent back
*/
virtual void onProperties(TcpConnection *connection, const Table &server, Table &client)
{
// make sure compilers dont complaint about unused parameters
(void) connection;
(void) server;
(void) client;
}
/** /**
* Method that is called when the heartbeat frequency is negotiated * Method that is called when the heartbeat frequency is negotiated
* between the server and the client. Applications can override this method * between the server and the client. Applications can override this method

View File

@ -139,6 +139,16 @@ public:
Table &set(const std::string &name, const std::string &value) { return set(name, LongString(value)); } Table &set(const std::string &name, const std::string &value) { return set(name, LongString(value)); }
Table &set(const std::string &name, const char *value) { return set(name, LongString(std::string(value))); } Table &set(const std::string &name, const char *value) { return set(name, LongString(std::string(value))); }
/**
* Is a certain field set in the table
* @param name
* @return bool
*/
bool contains(const std::string &name) const
{
return _fields.find(name) != _fields.end();
}
/** /**
* Get a field * Get a field
* *

View File

@ -183,25 +183,25 @@ public:
*/ */
virtual bool process(ConnectionImpl *connection) override virtual bool process(ConnectionImpl *connection) override
{ {
// the client properties
Table properties;
// move connection to handshake mode
connection->setProtocolOk(_properties, properties);
// the capabilities // the capabilities
Table capabilities; Table capabilities;
// we want a special treatment for authentication failures // we want a special treatment for authentication failures
capabilities["authentication_failure_close"] = true; capabilities["authentication_failure_close"] = true;
// the peer properties
Table properties;
// fill the peer properties // fill the peer properties
properties["product"] = "Copernica AMQP library"; if (!properties.contains("product")) properties["product"] = "Copernica AMQP library";
properties["version"] = "Unknown"; if (!properties.contains("version")) properties["version"] = "Unknown";
properties["platform"] = "Unknown"; if (!properties.contains("platform")) properties["platform"] = "Unknown";
properties["copyright"] = "Copyright 2015 - 2018 Copernica BV"; if (!properties.contains("copyright")) properties["copyright"] = "Copyright 2015 - 2018 Copernica BV";
properties["information"] = "https://www.copernica.com"; if (!properties.contains("information")) properties["information"] = "https://www.copernica.com";
properties["capabilities"] = capabilities; if (!properties.contains("capabilities")) properties["capabilities"] = capabilities;
// move connection to handshake mode
connection->setProtocolOk();
// send back a connection start ok frame // send back a connection start ok frame
connection->send(ConnectionStartOKFrame(properties, "PLAIN", connection->login().saslPlain(), "en_US")); connection->send(ConnectionStartOKFrame(properties, "PLAIN", connection->login().saslPlain(), "en_US"));

View File

@ -101,9 +101,15 @@ bool TcpConnection::close(bool immediate)
// if no immediate disconnect is needed, we can simply start the closing handshake // if no immediate disconnect is needed, we can simply start the closing handshake
if (!immediate) return _connection.close(); if (!immediate) return _connection.close();
// failing the connection could destruct "this"
Monitor monitor(this);
// fail the connection / report the error to user-space // fail the connection / report the error to user-space
_connection.fail("connection prematurely closed by client"); _connection.fail("connection prematurely closed by client");
// stop if object was destructed
if (!monitor.valid()) return true;
// change the state // change the state
_state.reset(new TcpClosed(this)); _state.reset(new TcpClosed(this));
@ -111,6 +117,19 @@ bool TcpConnection::close(bool immediate)
return true; return true;
} }
/**
* Method that is called when the RabbitMQ server and your client application
* exchange some properties that describe their identity.
* @param connection The connection about which information is exchanged
* @param server Properties sent by the server
* @param client Properties that are to be sent back
*/
void TcpConnection::onProperties(Connection *connection, const Table &server, Table &client)
{
// tell the handler
return _handler->onProperties(this, server, client);
}
/** /**
* Method that is called when the heartbeat frequency is negotiated. * Method that is called when the heartbeat frequency is negotiated.
* @param connection The connection that suggested a heartbeat interval * @param connection The connection that suggested a heartbeat interval