diff --git a/include/channel.h b/include/channel.h index bf0e30d..aff013f 100644 --- a/include/channel.h +++ b/include/channel.h @@ -248,6 +248,30 @@ public: */ bool removeQueue(const std::string &name, int flags = 0) { return _implementation.removeQueue(name, flags); } + /** + * Publish a message to an exchange + * + * The following flags can be used + * + * - mandatory if set, an unroutable message will be reported to the channel handler with the onReturned method + * - immediate if set, a message that could not immediately be consumed is returned to the onReturned method + * + * @todo implement to onReturned() method + * + * @param exchange the exchange to publish to + * @param routingkey the routing key + * @param flags optional flags (see above) + * @param envelope the full envelope to send + * @param message the message to send + * @param size size of the message + */ + bool publish(const std::string &exchange, const std::string &routingKey, int flags, const Envelope &envelope) { return _implementation.publish(exchange, routingKey, flags, envelope); } + bool publish(const std::string &exchange, const std::string &routingKey, const Envelope &envelope) { return _implementation.publish(exchange, routingKey, 0, envelope); } + bool publish(const std::string &exchange, const std::string &routingKey, int flags, const std::string &message) { return _implementation.publish(exchange, routingKey, flags, Envelope(message)); } + bool publish(const std::string &exchange, const std::string &routingKey, const std::string &message) { return _implementation.publish(exchange, routingKey, 0, Envelope(message)); } + bool publish(const std::string &exchange, const std::string &routingKey, int flags, const char *message, size_t size) { return _implementation.publish(exchange, routingKey, flags, Envelope(message, size)); } + bool publish(const std::string &exchange, const std::string &routingKey, const char *message, size_t size) { return _implementation.publish(exchange, routingKey, 0, Envelope(message, size)); } + /** * Close the current channel * @return bool diff --git a/include/channelimpl.h b/include/channelimpl.h index d9b8660..a3198be 100644 --- a/include/channelimpl.h +++ b/include/channelimpl.h @@ -209,6 +209,25 @@ public: * @return bool */ bool removeQueue(const std::string &name, int flags); + + /** + * Publish a message to an exchange + * + * The following flags can be used + * + * - mandatory if set, an unroutable message will be reported to the channel handler with the onReturned method + * - immediate if set, a message that could not immediately be consumed is returned to the onReturned method + * + * @todo implement to onReturned() method + * + * @param exchange the exchange to publish to + * @param routingkey the routing key + * @param flags optional flags (see above) + * @param envelope the full envelope to send + * @param message the message to send + * @param size size of the message + */ + bool publish(const std::string &exchange, const std::string &routingKey, int flags, const Envelope &envelope); /** * Close the current channel @@ -275,7 +294,7 @@ public: * Report an error message on a channel * @param message */ - void reportChannelError(const std::string &message) + void reportError(const std::string &message) { // change state _state = state_closed; @@ -289,7 +308,7 @@ public: */ void reportExchangeDeclared() { - if(_handler) _handler->onExchangeDeclared(_parent); + if (_handler) _handler->onExchangeDeclared(_parent); } /** @@ -297,7 +316,7 @@ public: */ void reportExchangeDeleted() { - if(_handler) _handler->onExchangeDeleted(_parent); + if (_handler) _handler->onExchangeDeleted(_parent); } /** @@ -305,7 +324,7 @@ public: */ void reportExchangeBound() { - if(_handler) _handler->onExchangeBound(_parent); + if (_handler) _handler->onExchangeBound(_parent); } /** @@ -313,7 +332,7 @@ public: */ void reportExchangeUnbound() { - if(_handler) _handler->onExchangeUnbound(_parent); + if (_handler) _handler->onExchangeUnbound(_parent); } /** @@ -324,7 +343,7 @@ public: */ void reportQueueDeclared(const std::string &queueName, uint32_t messageCount, uint32_t consumerCount) { - if(_handler) _handler->onQueueDeclared(_parent, queueName, messageCount, consumerCount); + if (_handler) _handler->onQueueDeclared(_parent, queueName, messageCount, consumerCount); } /** @@ -332,7 +351,7 @@ public: */ void reportQueueBound() { - if(_handler) _handler->onQueueBound(_parent); + if (_handler) _handler->onQueueBound(_parent); } /** @@ -340,7 +359,7 @@ public: */ void reportQueueUnbound() { - if(_handler) _handler->onQueueUnbound(_parent); + if (_handler) _handler->onQueueUnbound(_parent); } /** @@ -349,7 +368,7 @@ public: */ void reportQueueDeleted(uint32_t messageCount) { - if(_handler) _handler->onQueueDeleted(_parent, messageCount); + if (_handler) _handler->onQueueDeleted(_parent, messageCount); } /** @@ -358,7 +377,7 @@ public: */ void reportQueuePurged(uint32_t messageCount) { - if(_handler) _handler->onQueuePurged(_parent, messageCount); + if (_handler) _handler->onQueuePurged(_parent, messageCount); } /** diff --git a/include/connectionimpl.h b/include/connectionimpl.h index 2e2d914..63e8ef7 100644 --- a/include/connectionimpl.h +++ b/include/connectionimpl.h @@ -241,9 +241,10 @@ public: * Report an error message * @param message */ - void reportConnectionError(const std::string &message) + void reportError(const std::string &message) { // close everything + // @todo is this not duplicate? close(); // set connection state to closed diff --git a/include/envelope.h b/include/envelope.h index 3b6a27c..7cc4545 100644 --- a/include/envelope.h +++ b/include/envelope.h @@ -121,6 +121,12 @@ public: * @param size */ Envelope(const char *data, size_t size) : _data(data), _size(size) {} + + /** + * Constructor based on a string + * @param message + */ + Envelope(const std::string &data) : _data(data.data()), _size(data.size()) {} /** * Destructor diff --git a/src/channelcloseframe.cpp b/src/channelcloseframe.cpp index 7aad0be..226bc15 100644 --- a/src/channelcloseframe.cpp +++ b/src/channelcloseframe.cpp @@ -29,7 +29,7 @@ bool ChannelCloseFrame::process(ConnectionImpl *connection) if (!channel) return false; // report to the handler - channel->reportChannelError(text()); + channel->reportError(text()); // done return true; diff --git a/src/connectioncloseframe.cpp b/src/connectioncloseframe.cpp index 9cd2ae3..eecff10 100644 --- a/src/connectioncloseframe.cpp +++ b/src/connectioncloseframe.cpp @@ -19,12 +19,14 @@ namespace AMQP { */ bool ConnectionCloseFrame::process(ConnectionImpl *connection) { + // @todo connection could be destructed after frame was sent + // send back the ok frame connection->send(ConnectionCloseOKFrame()); // no need to check for a channel, the error is connection wide // report the error on the connection - connection->reportConnectionError(text()); + connection->reportError(text()); // done return true; diff --git a/src/connectionimpl.cpp b/src/connectionimpl.cpp index 51874b9..73d02b8 100644 --- a/src/connectionimpl.cpp +++ b/src/connectionimpl.cpp @@ -138,7 +138,7 @@ size_t ConnectionImpl::parse(char *buffer, size_t size) catch (const ProtocolException &exception) { // something terrible happened on the protocol (like data out of range) - reportConnectionError(exception.what()); + reportError(exception.what()); // done return processed;