when a connection gets in error state, all deferred results will now also call their error callback, and the channel wide error handler is called _after_ all individual error handlers are called
This commit is contained in:
parent
59e0b61e6b
commit
1fecc57d67
|
|
@ -479,12 +479,6 @@ public:
|
||||||
if (!next) _newestCallback = nullptr;
|
if (!next) _newestCallback = nullptr;
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
|
||||||
* Report errors to all deferred objects already in an error state
|
|
||||||
* @param force Report errors even for objects not already in error state
|
|
||||||
*/
|
|
||||||
void reportErrors(bool force = false);
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Report an error message on a channel
|
* Report an error message on a channel
|
||||||
* @param message
|
* @param message
|
||||||
|
|
@ -496,27 +490,38 @@ public:
|
||||||
|
|
||||||
// we are going to call callbacks that could destruct the channel
|
// we are going to call callbacks that could destruct the channel
|
||||||
Monitor monitor(this);
|
Monitor monitor(this);
|
||||||
|
|
||||||
|
// call the oldest
|
||||||
|
if (_oldestCallback)
|
||||||
|
{
|
||||||
|
// call the callback
|
||||||
|
auto *next = _oldestCallback->reportError(message);
|
||||||
|
|
||||||
|
// leap out if channel no longer exists
|
||||||
|
if (!monitor.valid()) return;
|
||||||
|
|
||||||
|
// set the oldest callback
|
||||||
|
_oldestCallback.reset(next);
|
||||||
|
}
|
||||||
|
|
||||||
|
// clean up all deferred other objects
|
||||||
|
while (_oldestCallback)
|
||||||
|
{
|
||||||
|
// call the callback
|
||||||
|
auto *next = _oldestCallback->reportError("Channel is in error state");
|
||||||
|
|
||||||
|
// leap out if channel no longer exists
|
||||||
|
if (!monitor.valid()) return;
|
||||||
|
|
||||||
|
// set the oldest callback
|
||||||
|
_oldestCallback.reset(next);
|
||||||
|
}
|
||||||
|
|
||||||
|
// all callbacks have been processed, so we also can reset the pointer to the newest
|
||||||
|
_newestCallback = nullptr;
|
||||||
|
|
||||||
// inform handler
|
// inform handler
|
||||||
if (_errorCallback) _errorCallback(message);
|
if (_errorCallback) _errorCallback(message);
|
||||||
|
|
||||||
// leap out if channel is already destructed, or when there are no further callbacks
|
|
||||||
if (!monitor.valid() || !_oldestCallback) return;
|
|
||||||
|
|
||||||
// call the callback
|
|
||||||
auto *next = _oldestCallback->reportError(message);
|
|
||||||
|
|
||||||
// leap out if channel no longer exists
|
|
||||||
if (!monitor.valid()) return;
|
|
||||||
|
|
||||||
// set the oldest callback
|
|
||||||
_oldestCallback.reset(next);
|
|
||||||
|
|
||||||
// if there was no next callback, the newest callback was just used
|
|
||||||
if (!next) _newestCallback = nullptr;
|
|
||||||
|
|
||||||
// when one error occured, all subsequent messages are in an error state too
|
|
||||||
reportErrors(true);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
|
|
||||||
|
|
@ -277,6 +277,19 @@ public:
|
||||||
{
|
{
|
||||||
// set connection state to closed
|
// set connection state to closed
|
||||||
_state = state_closed;
|
_state = state_closed;
|
||||||
|
|
||||||
|
// monitor because every callback could invalidate the connection
|
||||||
|
Monitor monitor(this);
|
||||||
|
|
||||||
|
// all deferred result objects in the channels should report this error too
|
||||||
|
for (auto &iter : _channels)
|
||||||
|
{
|
||||||
|
// report the errors
|
||||||
|
iter.second->reportError(message);
|
||||||
|
|
||||||
|
// leap out if no longer valid
|
||||||
|
if (!monitor.valid()) return;
|
||||||
|
}
|
||||||
|
|
||||||
// inform handler
|
// inform handler
|
||||||
_handler->onError(_parent, message);
|
_handler->onError(_parent, message);
|
||||||
|
|
|
||||||
|
|
@ -119,32 +119,6 @@ Deferred &ChannelImpl::push(const Frame &frame)
|
||||||
return push(new Deferred(send(frame)));
|
return push(new Deferred(send(frame)));
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
|
||||||
* Report errors to all deferred objects already in an error state
|
|
||||||
* @param force Report errors even for objects not already in error state
|
|
||||||
*/
|
|
||||||
void ChannelImpl::reportErrors(bool force)
|
|
||||||
{
|
|
||||||
// keep looping for as long as the oldest callback is in an error state
|
|
||||||
while (_oldestCallback && (force || !*_oldestCallback))
|
|
||||||
{
|
|
||||||
// construct monitor, because channel could be destructed
|
|
||||||
Monitor monitor(this);
|
|
||||||
|
|
||||||
// report the error
|
|
||||||
auto *next = _oldestCallback->reportError("Frame could not be delivered");
|
|
||||||
|
|
||||||
// leap out if object is no longer valid after the callback was called
|
|
||||||
if (!monitor.valid()) return;
|
|
||||||
|
|
||||||
// install the next deferred object
|
|
||||||
_oldestCallback.reset(next);
|
|
||||||
|
|
||||||
// was this also the newest callback
|
|
||||||
if (!next) _newestCallback = nullptr;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Pause deliveries on a channel
|
* Pause deliveries on a channel
|
||||||
*
|
*
|
||||||
|
|
|
||||||
Loading…
Reference in New Issue