2015-11-01 01:26:04 +08:00
|
|
|
/**
|
|
|
|
|
* TcpConnected.h
|
|
|
|
|
*
|
|
|
|
|
* The actual tcp connection - this is the "_impl" of a tcp-connection after
|
|
|
|
|
* the hostname was resolved into an IP address
|
|
|
|
|
*
|
|
|
|
|
* @author Emiel Bruijntjes <emiel.bruijntjes@copernica.com>
|
2018-03-06 00:29:37 +08:00
|
|
|
* @copyright 2015 - 2018 Copernica BV
|
2015-11-01 01:26:04 +08:00
|
|
|
*/
|
|
|
|
|
|
|
|
|
|
/**
|
|
|
|
|
* Include guard
|
|
|
|
|
*/
|
|
|
|
|
#pragma once
|
|
|
|
|
|
|
|
|
|
/**
|
|
|
|
|
* Dependencies
|
|
|
|
|
*/
|
2016-06-16 01:41:14 +08:00
|
|
|
#include "tcpoutbuffer.h"
|
2016-06-16 01:32:30 +08:00
|
|
|
#include "tcpinbuffer.h"
|
2018-11-05 06:34:31 +08:00
|
|
|
#include "tcpshutdown.h"
|
2018-06-15 15:19:27 +08:00
|
|
|
#include "poll.h"
|
2015-11-01 01:26:04 +08:00
|
|
|
|
|
|
|
|
/**
|
|
|
|
|
* Set up namespace
|
|
|
|
|
*/
|
|
|
|
|
namespace AMQP {
|
|
|
|
|
|
|
|
|
|
/**
|
|
|
|
|
* Class definition
|
|
|
|
|
*/
|
2018-11-05 06:34:31 +08:00
|
|
|
class TcpConnected : public TcpExtState
|
2015-11-01 01:26:04 +08:00
|
|
|
{
|
|
|
|
|
private:
|
|
|
|
|
/**
|
|
|
|
|
* The outgoing buffer
|
2016-06-16 01:41:14 +08:00
|
|
|
* @var TcpOutBuffer
|
2015-11-01 01:26:04 +08:00
|
|
|
*/
|
2016-06-16 01:41:14 +08:00
|
|
|
TcpOutBuffer _out;
|
2015-11-01 01:26:04 +08:00
|
|
|
|
|
|
|
|
/**
|
|
|
|
|
* An incoming buffer
|
2016-06-16 01:32:30 +08:00
|
|
|
* @var TcpInBuffer
|
2015-11-01 01:26:04 +08:00
|
|
|
*/
|
2016-06-16 01:32:30 +08:00
|
|
|
TcpInBuffer _in;
|
2015-11-01 01:26:04 +08:00
|
|
|
|
2016-06-22 19:49:49 +08:00
|
|
|
/**
|
|
|
|
|
* Cached reallocation instruction
|
|
|
|
|
* @var size_t
|
|
|
|
|
*/
|
|
|
|
|
size_t _reallocate = 0;
|
2018-03-08 17:37:49 +08:00
|
|
|
|
|
|
|
|
/**
|
|
|
|
|
* Have we already made the last report to the user (about an error or closed connection?)
|
|
|
|
|
* @var bool
|
|
|
|
|
*/
|
|
|
|
|
bool _finalized = false;
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
/**
|
2018-11-05 06:34:31 +08:00
|
|
|
* Start an elegant shutdown
|
|
|
|
|
*
|
|
|
|
|
* @todo remove this method
|
2018-03-08 17:37:49 +08:00
|
|
|
*/
|
2018-11-05 06:34:31 +08:00
|
|
|
void shutdown2()
|
2018-03-08 17:37:49 +08:00
|
|
|
{
|
2018-11-05 06:34:31 +08:00
|
|
|
// we will shutdown the socket in a very elegant way, we notify the peer
|
|
|
|
|
// that we will not be sending out more write operations
|
|
|
|
|
::shutdown(_socket, SHUT_WR);
|
2018-03-08 17:37:49 +08:00
|
|
|
|
2018-11-05 06:34:31 +08:00
|
|
|
// we still monitor the socket for readability to see if our close call was
|
|
|
|
|
// confirmed by the peer
|
|
|
|
|
_parent->onIdle(this, _socket, readable);
|
2018-03-08 17:37:49 +08:00
|
|
|
}
|
2016-06-22 19:49:49 +08:00
|
|
|
|
2015-11-23 16:40:54 +08:00
|
|
|
/**
|
|
|
|
|
* Helper method to report an error
|
2018-11-05 06:34:31 +08:00
|
|
|
* @param monitor Monitor to check validity of "this"
|
2015-11-23 16:40:54 +08:00
|
|
|
* @return bool Was an error reported?
|
|
|
|
|
*/
|
2018-11-05 06:34:31 +08:00
|
|
|
bool reportError(const Monitor &monitor)
|
2015-11-23 16:40:54 +08:00
|
|
|
{
|
|
|
|
|
// some errors are ok and do not (necessarily) mean that we're disconnected
|
|
|
|
|
if (errno == EAGAIN || errno == EWOULDBLOCK || errno == EINTR) return false;
|
2018-11-05 06:34:31 +08:00
|
|
|
|
|
|
|
|
// tell the connection that it failed
|
|
|
|
|
// @todo we should report an error, but that could be wrong, because it calls back to us
|
|
|
|
|
|
|
|
|
|
// we're no longer interested in the socket (this also calls onClosed())
|
|
|
|
|
cleanup();
|
2015-11-23 16:40:54 +08:00
|
|
|
|
|
|
|
|
// done
|
|
|
|
|
return true;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
/**
|
|
|
|
|
* Construct the next state
|
|
|
|
|
* @param monitor Object that monitors whether connection still exists
|
|
|
|
|
* @return TcpState*
|
|
|
|
|
*/
|
|
|
|
|
TcpState *nextState(const Monitor &monitor)
|
|
|
|
|
{
|
|
|
|
|
// if the object is still in a valid state, we can move to the close-state,
|
|
|
|
|
// otherwise there is no point in moving to a next state
|
|
|
|
|
return monitor.valid() ? new TcpClosed(this) : nullptr;
|
|
|
|
|
}
|
|
|
|
|
|
2015-11-01 01:26:04 +08:00
|
|
|
public:
|
|
|
|
|
/**
|
|
|
|
|
* Constructor
|
2018-11-05 06:34:31 +08:00
|
|
|
* @param state The previous state
|
2015-11-01 01:26:04 +08:00
|
|
|
* @param buffer The buffer that was already built
|
|
|
|
|
*/
|
2018-11-05 06:34:31 +08:00
|
|
|
TcpConnected(TcpExtState *state, TcpOutBuffer &&buffer) :
|
|
|
|
|
TcpExtState(state),
|
2016-06-16 01:32:30 +08:00
|
|
|
_out(std::move(buffer)),
|
|
|
|
|
_in(4096)
|
2015-11-01 01:26:04 +08:00
|
|
|
{
|
|
|
|
|
// if there is already an output buffer, we have to send out that first
|
|
|
|
|
if (_out) _out.sendto(_socket);
|
|
|
|
|
|
|
|
|
|
// tell the handler to monitor the socket, if there is an out
|
2018-11-05 06:34:31 +08:00
|
|
|
_parent->onIdle(this, _socket, _out ? readable | writable : readable);
|
2015-11-01 01:26:04 +08:00
|
|
|
}
|
|
|
|
|
|
|
|
|
|
/**
|
|
|
|
|
* Destructor
|
|
|
|
|
*/
|
2018-11-05 06:34:31 +08:00
|
|
|
virtual ~TcpConnected() noexcept = default;
|
2017-12-13 00:10:51 +08:00
|
|
|
|
|
|
|
|
/**
|
|
|
|
|
* The filedescriptor of this connection
|
|
|
|
|
* @return int
|
|
|
|
|
*/
|
|
|
|
|
virtual int fileno() const override { return _socket; }
|
2018-04-01 17:54:05 +08:00
|
|
|
|
|
|
|
|
/**
|
2018-04-02 04:34:15 +08:00
|
|
|
* Number of bytes in the outgoing buffer
|
|
|
|
|
* @return std::size_t
|
2018-04-01 17:54:05 +08:00
|
|
|
*/
|
2018-04-02 04:34:15 +08:00
|
|
|
virtual std::size_t queued() const override { return _out.size(); }
|
2018-04-01 17:54:05 +08:00
|
|
|
|
2015-11-01 01:26:04 +08:00
|
|
|
/**
|
|
|
|
|
* Process the filedescriptor in the object
|
2018-03-08 17:02:42 +08:00
|
|
|
* @param monitor Monitor to check if the object is still alive
|
2015-11-01 01:26:04 +08:00
|
|
|
* @param fd Filedescriptor that is active
|
|
|
|
|
* @param flags AMQP::readable and/or AMQP::writable
|
|
|
|
|
* @return New state object
|
|
|
|
|
*/
|
2018-03-08 17:02:42 +08:00
|
|
|
virtual TcpState *process(const Monitor &monitor, int fd, int flags) override
|
2015-11-01 01:26:04 +08:00
|
|
|
{
|
|
|
|
|
// must be the socket
|
|
|
|
|
if (fd != _socket) return this;
|
2015-11-23 16:40:54 +08:00
|
|
|
|
2015-11-01 01:26:04 +08:00
|
|
|
// can we write more data to the socket?
|
|
|
|
|
if (flags & writable)
|
|
|
|
|
{
|
|
|
|
|
// send out the buffered data
|
2015-11-23 16:40:54 +08:00
|
|
|
auto result = _out.sendto(_socket);
|
|
|
|
|
|
|
|
|
|
// are we in an error state?
|
2018-11-05 06:34:31 +08:00
|
|
|
if (result < 0 && reportError(monitor)) return nextState(monitor);
|
2015-11-01 01:26:04 +08:00
|
|
|
|
|
|
|
|
// if buffer is empty by now, we no longer have to check for
|
|
|
|
|
// writability, but only for readability
|
2018-11-05 06:34:31 +08:00
|
|
|
if (!_out) _parent->onIdle(this, _socket, readable);
|
2015-11-01 01:26:04 +08:00
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// should we check for readability too?
|
|
|
|
|
if (flags & readable)
|
|
|
|
|
{
|
|
|
|
|
// read data from buffer
|
2018-11-05 06:34:31 +08:00
|
|
|
ssize_t result = _in.receivefrom(_socket, _parent->expected());
|
2015-11-01 01:26:04 +08:00
|
|
|
|
2015-11-23 16:40:54 +08:00
|
|
|
// are we in an error state?
|
2018-11-05 06:34:31 +08:00
|
|
|
if (result < 0 && reportError(monitor)) return nextState(monitor);
|
|
|
|
|
|
|
|
|
|
// @todo should we also check for result == 0
|
2016-06-16 02:50:33 +08:00
|
|
|
|
2015-11-23 16:40:54 +08:00
|
|
|
// we need a local copy of the buffer - because it is possible that "this"
|
|
|
|
|
// object gets destructed halfway through the call to the parse() method
|
2016-06-16 01:32:30 +08:00
|
|
|
TcpInBuffer buffer(std::move(_in));
|
2015-11-01 16:43:17 +08:00
|
|
|
|
2015-11-23 16:40:54 +08:00
|
|
|
// parse the buffer
|
2018-11-05 06:34:31 +08:00
|
|
|
auto processed = _parent->onReceived(this, buffer);
|
2015-11-01 16:43:17 +08:00
|
|
|
|
2015-11-23 16:40:54 +08:00
|
|
|
// "this" could be removed by now, check this
|
|
|
|
|
if (!monitor.valid()) return nullptr;
|
|
|
|
|
|
|
|
|
|
// shrink buffer
|
|
|
|
|
buffer.shrink(processed);
|
|
|
|
|
|
|
|
|
|
// restore the buffer as member
|
2016-06-16 02:50:33 +08:00
|
|
|
_in = std::move(buffer);
|
2016-06-22 19:49:49 +08:00
|
|
|
|
|
|
|
|
// do we have to reallocate?
|
2018-03-06 05:24:19 +08:00
|
|
|
if (_reallocate) _in.reallocate(_reallocate);
|
|
|
|
|
|
|
|
|
|
// we can remove the reallocate instruction
|
|
|
|
|
_reallocate = 0;
|
2015-11-01 01:26:04 +08:00
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// keep same object
|
|
|
|
|
return this;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
/**
|
|
|
|
|
* Send data over the connection
|
|
|
|
|
* @param buffer buffer to send
|
|
|
|
|
* @param size size of the buffer
|
|
|
|
|
*/
|
2015-11-28 18:38:15 +08:00
|
|
|
virtual void send(const char *buffer, size_t size) override
|
2015-11-01 01:26:04 +08:00
|
|
|
{
|
|
|
|
|
// is there already a buffer of data that can not be sent?
|
|
|
|
|
if (_out) return _out.add(buffer, size);
|
2016-07-06 19:03:51 +08:00
|
|
|
|
2015-11-01 01:26:04 +08:00
|
|
|
// there is no buffer, send the data right away
|
2016-07-11 02:44:14 +08:00
|
|
|
auto result = ::send(_socket, buffer, size, AMQP_CPP_MSG_NOSIGNAL);
|
2016-07-06 19:03:51 +08:00
|
|
|
|
2015-11-01 01:26:04 +08:00
|
|
|
// number of bytes sent
|
|
|
|
|
size_t bytes = result < 0 ? 0 : result;
|
|
|
|
|
|
|
|
|
|
// ok if all data was sent
|
|
|
|
|
if (bytes >= size) return;
|
|
|
|
|
|
|
|
|
|
// add the data to the buffer
|
|
|
|
|
_out.add(buffer + bytes, size - bytes);
|
|
|
|
|
|
|
|
|
|
// start monitoring the socket to find out when it is writable
|
2018-11-05 06:34:31 +08:00
|
|
|
_parent->onIdle(this, _socket, readable | writable);
|
2015-11-01 01:26:04 +08:00
|
|
|
}
|
2016-09-19 16:19:06 +08:00
|
|
|
|
|
|
|
|
/**
|
|
|
|
|
* Flush the connection, sent all buffered data to the socket
|
2018-03-08 17:02:42 +08:00
|
|
|
* @param monitor Object to check if connection still lives
|
2016-09-19 16:19:06 +08:00
|
|
|
* @return TcpState new tcp state
|
|
|
|
|
*/
|
2018-03-08 17:02:42 +08:00
|
|
|
virtual TcpState *flush(const Monitor &monitor) override
|
2016-09-19 16:19:06 +08:00
|
|
|
{
|
2018-03-06 15:40:44 +08:00
|
|
|
// create an object to wait for the filedescriptor to becomes active
|
2018-06-15 15:19:27 +08:00
|
|
|
Poll poll(_socket);
|
2018-03-06 00:29:37 +08:00
|
|
|
|
2018-03-07 05:03:53 +08:00
|
|
|
// keep running until the out buffer is not empty
|
2016-09-19 16:19:06 +08:00
|
|
|
while (_out)
|
|
|
|
|
{
|
|
|
|
|
// poll the socket, is it already writable?
|
2018-06-15 15:19:27 +08:00
|
|
|
if (!poll.writable(true)) return this;
|
2016-09-19 16:19:06 +08:00
|
|
|
|
|
|
|
|
// socket is writable, send as much data as possible
|
2018-03-08 17:02:42 +08:00
|
|
|
auto *newstate = process(monitor, _socket, writable);
|
2016-09-19 16:19:06 +08:00
|
|
|
|
|
|
|
|
// are we done
|
|
|
|
|
if (newstate != this) return newstate;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// all has been sent
|
|
|
|
|
return this;
|
|
|
|
|
}
|
2018-10-29 01:13:13 +08:00
|
|
|
|
2016-06-16 01:32:30 +08:00
|
|
|
/**
|
2018-11-05 06:34:31 +08:00
|
|
|
* When the AMQP transport layer is closed
|
|
|
|
|
* @param monitor Object that can be used if connection is still alive
|
|
|
|
|
* @return TcpState New implementation object
|
2016-06-16 01:32:30 +08:00
|
|
|
*/
|
2018-11-05 06:34:31 +08:00
|
|
|
virtual TcpState *onAmqpClosed(const Monitor &monitor) override
|
2016-06-16 01:32:30 +08:00
|
|
|
{
|
2018-11-05 06:34:31 +08:00
|
|
|
// move to the tcp shutdown state
|
|
|
|
|
return new TcpShutdown(this);
|
2016-06-16 01:32:30 +08:00
|
|
|
}
|
2017-04-26 21:33:17 +08:00
|
|
|
|
|
|
|
|
/**
|
2018-11-05 06:34:31 +08:00
|
|
|
* When an error occurs in the AMQP protocol
|
|
|
|
|
* @param monitor Monitor that can be used to check if the connection is still alive
|
|
|
|
|
* @param message The error message
|
|
|
|
|
* @return TcpState New implementation object
|
2017-04-26 21:33:17 +08:00
|
|
|
*/
|
2018-11-05 06:34:31 +08:00
|
|
|
virtual TcpState *onAmqpError(const Monitor &monitor, const char *message) override
|
2017-04-26 21:33:17 +08:00
|
|
|
{
|
2018-11-05 06:34:31 +08:00
|
|
|
// tell the user about it
|
|
|
|
|
// @todo do this somewhere else
|
|
|
|
|
//_handler->onError(_connection, message);
|
2017-04-26 21:33:17 +08:00
|
|
|
|
2018-11-05 06:34:31 +08:00
|
|
|
// stop if the object was destructed
|
|
|
|
|
if (!monitor.valid()) return nullptr;
|
2018-03-08 17:37:49 +08:00
|
|
|
|
2018-11-05 06:34:31 +08:00
|
|
|
// move to the tcp shutdown state
|
|
|
|
|
return new TcpShutdown(this);
|
2018-03-08 17:37:49 +08:00
|
|
|
}
|
|
|
|
|
|
|
|
|
|
/**
|
2018-11-05 06:34:31 +08:00
|
|
|
* Install max-frame size
|
|
|
|
|
* @param heartbeat suggested heartbeat
|
2018-03-08 17:37:49 +08:00
|
|
|
*/
|
2018-11-05 06:34:31 +08:00
|
|
|
virtual void maxframe(size_t maxframe) override
|
2018-03-08 17:37:49 +08:00
|
|
|
{
|
2018-11-05 06:34:31 +08:00
|
|
|
// remember that we have to reallocate (_in member can not be accessed because it is moved away)
|
|
|
|
|
_reallocate = maxframe;
|
2017-04-26 21:33:17 +08:00
|
|
|
}
|
2015-11-01 01:26:04 +08:00
|
|
|
};
|
|
|
|
|
|
|
|
|
|
/**
|
|
|
|
|
* End of namespace
|
|
|
|
|
*/
|
|
|
|
|
}
|
|
|
|
|
|