diff --git a/include/amqpcpp/libev.h b/include/amqpcpp/libev.h index 76eea2f..17c6c34 100644 --- a/include/amqpcpp/libev.h +++ b/include/amqpcpp/libev.h @@ -255,8 +255,9 @@ private: // the server was inactive for a too long period of time, reset state _next = _expire = 0.0; _timeout = 0; - // close the connection because server was inactive - return (void)_connection->close(); + // close the connection because server was inactive (we close it with immediate effect, + // because it was inactive so we cannot trust it to respect the AMQP close handshake) + return (void)_connection->close(true); } else if (now >= _next) { @@ -365,8 +366,16 @@ private: // because the server has just sent us some data, we will update the expire time too _expire = now + _timeout * 1.5; + // stop the existing timer (we have to stop it and restart it, because ev_timer_set() + // on its own does not change the running timer) (note that we assume that the timer + // is already running and keeps on running, so no calls to ev_ref()/en_unref() here) + ev_timer_stop(_loop, &_timer); + // find the earliest thing that expires ev_timer_set(&_timer, std::min(_next, _expire) - now, 0.0); + + // and start it again + ev_timer_start(_loop, &_timer); // expose the accepted interval return _timeout; diff --git a/src/linux_tcp/tcpresolver.h b/src/linux_tcp/tcpresolver.h index d4af8de..0b0801c 100644 --- a/src/linux_tcp/tcpresolver.h +++ b/src/linux_tcp/tcpresolver.h @@ -120,6 +120,12 @@ private: // turn socket into a non-blocking socket and set the close-on-exec bit fcntl(_socket, F_SETFL, O_NONBLOCK | O_CLOEXEC); + + // we set the 'keepalive' option so that we automatically detect if the peer is dead + int keepalive = 1; + + // set the keepalive option + setsockopt(_socket, SOL_SOCKET, SO_KEEPALIVE, &keepalive, sizeof(keepalive)); // try to connect non-blocking if (connect(_socket, addresses[i]->ai_addr, addresses[i]->ai_addrlen) == 0) break;