Skip to content

Commit

Permalink
cool#9833: Mitigate connection count limitation, fix timeout-checking…
Browse files Browse the repository at this point in the history
… code (*WIP*)

This commit is a WIP, pushed for discussion and to be amended.

- ProtocolHandlerInterface::checkTimeout(..)
  - Add bool return value: true -> shutdown connection, caller shall stop processing
  - Implemented for http::Session
    - Timeout (30s) with missing response
  - Implemented for WebSocketHandler
    - Timeout (64s = SocketPoll::DefaultPollTimeoutMicroS)
      after missing pong (server only)

- StreamSocket -> Socket (properties moved)
  - bytes sent/received
  - closed state

- Socket (added properties)
  - creation- and last-seen -time
  - socket type and port
  - checkForcedRemoval(..) *WIP*
    - called directly from SocketPoll::poll()
    - only for IPv4/v6 network connections
    - similar to ProtocolHandlerInterface::checkTimeout(..)
    - added further criteria (age, throughput, ..)
      - Timeout (64s = SocketPoll::DefaultPollTimeoutMicroS)
        if (now - lastSeen) > timeout
      - Timeout (12 hours)
        if (now - creationTime) > timeout
      - TODO: Add maximimal IPv4/IPv6 socket-count criteria, drop oldest.

- SocketPoll::poll()
  - Additionally erases if !socket->isOpen() || socket->checkForcedRemoval()

- TODO
  - Facility to configure timeouts, at least for testing!
  - More elaborated tests
    - WebSocket
    - ..

Signed-off-by: Sven Göthel <[email protected]>
Change-Id: I7e1a9329e0848c40a210f6250e29e26950da6fbc
  • Loading branch information
Sven Göthel committed Aug 28, 2024
1 parent 595257b commit 3ef99c5
Show file tree
Hide file tree
Showing 9 changed files with 508 additions and 102 deletions.
4 changes: 2 additions & 2 deletions common/Util.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1106,7 +1106,7 @@ namespace Util
#endif
if (0 == _frames.size())
{
_frames.emplace_back(nullptr, "empty");
_frames.emplace_back(nullptr, Symbol{"n/a", "empty", "0x00", ""});
}
}

Expand All @@ -1129,7 +1129,7 @@ namespace Util
}
return os;
}
constexpr std::string Backtrace::toString() const noexcept
std::string Backtrace::toString() const noexcept
{
std::string s = "Backtrace:\n";
int fidx = skipFrames;
Expand Down
2 changes: 1 addition & 1 deletion common/Util.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -1287,7 +1287,7 @@ int main(int argc, char**argv)
std::ostream& send(std::ostream& os) const;

/// Produces a string representation, one line per frame
constexpr std::string toString() const noexcept;
std::string toString() const noexcept;

constexpr size_t size() const noexcept { return _frames.size(); }
constexpr const Symbol& operator[](size_t idx) const noexcept
Expand Down
24 changes: 19 additions & 5 deletions net/HttpRequest.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -1201,7 +1201,7 @@ class Session final : public ProtocolHandlerInterface
/// Returns the default timeout.
static constexpr std::chrono::milliseconds getDefaultTimeout()
{
return std::chrono::seconds(30);
return std::chrono::seconds(30); // FIXME?
}

/// Returns the current protocol scheme.
Expand Down Expand Up @@ -1432,7 +1432,10 @@ class Session final : public ProtocolHandlerInterface
while (!_response->done())
{
const auto now = std::chrono::steady_clock::now();
checkTimeout(now);
if (checkTimeout(now))
{
return false;
}

const auto remaining =
std::chrono::duration_cast<std::chrono::microseconds>(deadline - now);
Expand Down Expand Up @@ -1692,14 +1695,18 @@ class Session final : public ProtocolHandlerInterface
net::asyncConnect(_host, _port, isSecure(), shared_from_this(), pushConnectCompleteToPoll);
}

void checkTimeout(std::chrono::steady_clock::time_point now) override
bool checkTimeout(std::chrono::steady_clock::time_point now) override
{
if (!_response || _response->done())
return;
{
return false;
}

const std::chrono::microseconds timeout = getTimeout();
const auto duration =
std::chrono::duration_cast<std::chrono::milliseconds>(now - _startTime);
if (now < _startTime || duration > getTimeout() || SigUtil::getTerminationFlag())

if (now < _startTime || duration > timeout || SigUtil::getTerminationFlag())
{
LOG_WRN("Timed out while requesting [" << _request.getVerb() << ' ' << _host
<< _request.getUrl() << "] after " << duration);
Expand All @@ -1712,7 +1719,14 @@ class Session final : public ProtocolHandlerInterface
// no good maintaining a poor connection (if that's the issue).
onDisconnect(); // Trigger manually (why wait for poll to do it?).
assert(isConnected() == false);
return true;
} else {
// FIXME: Remove!
LOG_DBG("Timeout check while requesting [" << _request.getVerb() << ' ' << _host
<< _request.getUrl() << "] after "
<< duration << " <= " << timeout);
}
return false;
}

int sendTextMessage(const char*, const size_t, bool) const override { return 0; }
Expand Down
155 changes: 144 additions & 11 deletions net/Socket.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,22 @@ std::unique_ptr<Watchdog> SocketPoll::PollWatchdog;

#define SOCKET_ABSTRACT_UNIX_NAME "0coolwsd-"

std::string Socket::toString(Type t) noexcept
{
switch (t)
{
case Type::IPv4:
return "IPv4";
case Type::IPv6:
return "IPv6";
case Type::All:
return "All";
case Type::Unix:
return "Unix";
}
return "Unknown";
}

int Socket::createSocket([[maybe_unused]] Socket::Type type)
{
#if !MOBILEAPP
Expand All @@ -86,9 +102,57 @@ int Socket::createSocket([[maybe_unused]] Socket::Type type)
#endif
}

const std::string Socket::getClientAddressAndPort() const noexcept
{
std::string s;
if (Type::IPv6 == type())
{
s.append("[").append(clientAddress()).append("]:").append(std::to_string(clientPort()));
}
else
{
s.append(clientAddress()).append(":").append(std::to_string(clientPort()));
}
return s;
}

std::string Socket::getStatsString(std::chrono::steady_clock::time_point now) const noexcept
{
const auto durTotal = std::chrono::duration_cast<std::chrono::milliseconds>(now - _creationTime);
const auto durLast = std::chrono::duration_cast<std::chrono::milliseconds>(now - _lastSeenTime);

float kBpsIn, kBpsOut;
if (durTotal.count() > 0)
{
kBpsIn = (float)_bytesRcvd / (float)durTotal.count();
kBpsOut = (float)_bytesSent / (float)durTotal.count();
}
else
{
kBpsIn = (float)_bytesRcvd / 1000.0f;
kBpsOut = (float)_bytesSent / 1000.0f;
}
return Util::formatString("Socket[#%d, dur[total %" PRIi64 "ms, last %" PRIi64
"ms], kBps[in %.1f, out %.1f], %s @ %s]",
getFD(), durTotal.count(), durLast.count(), kBpsIn, kBpsOut,
toString(type()).c_str(),
getClientAddressAndPort().c_str());
}

std::string Socket::toString() const noexcept
{
std::string s("Socket[#");
s.append(std::to_string(getFD()))
.append(", ")
.append(toString(type()))
.append(" @ ")
.append(getClientAddressAndPort())
.append("]");
return s;
}

bool StreamSocket::socketpair(std::shared_ptr<StreamSocket> &parent,
std::shared_ptr<StreamSocket> &child)
bool StreamSocket::socketpair(std::shared_ptr<StreamSocket>& parent,
std::shared_ptr<StreamSocket>& child)
{
#if MOBILEAPP
return false;
Expand Down Expand Up @@ -412,7 +476,7 @@ int SocketPoll::poll(int64_t timeoutMaxMicroS)
socketErrorCount++;
#endif

std::chrono::steady_clock::time_point now =
const std::chrono::steady_clock::time_point now =
std::chrono::steady_clock::now();

// The events to poll on change each spin of the loop.
Expand Down Expand Up @@ -528,7 +592,7 @@ int SocketPoll::poll(int64_t timeoutMaxMicroS)
assert(!_pollSockets.empty() && "All existing sockets disappeared from the SocketPoll");

// Fire the poll callbacks and remove dead fds.
std::chrono::steady_clock::time_point newNow = std::chrono::steady_clock::now();
const std::chrono::steady_clock::time_point newNow = std::chrono::steady_clock::now();

// We use the _pollStartIndex to start the polling at a different index each time. Do some
// sanity check first to handle the case where we removed one or several sockets last time.
Expand All @@ -548,7 +612,23 @@ int SocketPoll::poll(int64_t timeoutMaxMicroS)
else if (!_pollSockets[i])
{
// removed in a callback
itemsErased++;
++itemsErased;
}
else if (!_pollSockets[i]->isOpen())
{
// closed socket ..
++itemsErased;
LOGA_TRC(Socket, '#' << _pollFds[i].fd << ": Removing socket (at " << i
<< " of " << _pollSockets.size() << ") from " << _name);
_pollSockets[i] = nullptr;
}
else if( _pollSockets[i]->checkForcedRemoval(newNow) )
{
// timed out socket ..
++itemsErased;
LOGA_TRC(Socket, '#' << _pollFds[i].fd << ": Removing socket (at " << i
<< " of " << _pollSockets.size() << ") from " << _name);
_pollSockets[i] = nullptr;
}
else if (_pollFds[i].fd == _pollSockets[i]->getFD())
{
Expand All @@ -569,9 +649,16 @@ int SocketPoll::poll(int64_t timeoutMaxMicroS)
rc = -1;
}

if (!disposition.isContinue())
if (!_pollSockets[i]->isOpen() || !disposition.isContinue())
{
itemsErased++;
// Potentially via ProtocolHandlerInterface::handlePoll()'s
// - ProtocolHandlerInterface::checkTimeout()
// - ProtocolHandlerInterface::onDisconnect())
// - disposition.setClosed()
// ProtocolHandlerInterface
// - http::Session::handlePoll() OK
// - WebSocketHandler::handlePoll() OK
++itemsErased;
LOGA_TRC(Socket, '#' << _pollFds[i].fd << ": Removing socket (at " << i
<< " of " << _pollSockets.size() << ") from " << _name);
_pollSockets[i] = nullptr;
Expand Down Expand Up @@ -933,8 +1020,8 @@ void StreamSocket::dumpState(std::ostream& os)
const int events = getPollEvents(std::chrono::steady_clock::now(), timeoutMaxMicroS);
os << '\t' << std::setw(6) << getFD() << "\t0x" << std::hex << events << std::dec << '\t'
<< (ignoringInput() ? "ignore\t" : "process\t") << std::setw(6) << _inBuffer.size() << '\t'
<< std::setw(6) << _outBuffer.size() << '\t' << " r: " << std::setw(6) << _bytesRecvd
<< "\t w: " << std::setw(6) << _bytesSent << '\t' << clientAddress() << '\t';
<< std::setw(6) << _outBuffer.size() << '\t' << " r: " << std::setw(6) << getBytesRcvd()
<< "\t w: " << std::setw(6) << getBytesSent() << '\t' << clientAddress() << '\t';
_socketHandler->dumpState(os);
if (_inBuffer.size() > 0)
Util::dumpHex(os, _inBuffer, "\t\tinBuffer:\n", "\t\t");
Expand Down Expand Up @@ -1097,7 +1184,7 @@ std::shared_ptr<Socket> ServerSocket::accept()
std::shared_ptr<Socket> _socket = createSocketFromAccept(rc, type);

inet_ntop(clientInfo.sin6_family, inAddr, addrstr, sizeof(addrstr));
_socket->setClientAddress(addrstr);
_socket->setClientAddress(addrstr, clientInfo.sin6_port);

LOG_TRC("Accepted socket #" << _socket->getFD() << " has family "
<< clientInfo.sin6_family << " address "
Expand Down Expand Up @@ -1298,6 +1385,52 @@ LocalServerSocket::~LocalServerSocket()
# define LOG_CHUNK(X)
#endif

bool StreamSocket::checkForcedRemoval(std::chrono::steady_clock::time_point now) noexcept
{
if ( !isIPType() ) // forced removal on IPv[46] network connections only
{
return false;
}
const std::chrono::microseconds timeoutMax = DefaultMaxConnectionTimMicroS; // FIXME?
const std::chrono::microseconds timeoutLast = SocketPoll::DefaultPollTimeoutMicroS; // FIXME?
const float minBytesPerSec = DefaultMinBytesPerSec; // FIXME?

const auto durTotal =
std::chrono::duration_cast<std::chrono::milliseconds>(now - getCreationTime());
const auto durLast =
std::chrono::duration_cast<std::chrono::milliseconds>(now - getLastSeenTime());
const float bytesPerSecIn = durTotal.count() > 0 ? (float)getBytesRcvd() / ((float)durTotal.count() / 1000.0f) : (float)getBytesRcvd();
if (now < getCreationTime() || durTotal > timeoutMax || durLast > timeoutLast ||
(bytesPerSecIn > 0.0f && bytesPerSecIn < minBytesPerSec) || SigUtil::getTerminationFlag())
{
LOG_WRN("Timed out socket after " << durTotal << ", " << getStatsString(now));

if (_socketHandler)
{
_socketHandler->onDisconnect();
if( isOpen() ) {
// FIXME: Ensure proper semantics of onDisconnect()
LOG_WRN("Socket still open post onDisconnect(), forced shutdown.");
shutdown(); // signal
closeConnection(); // real -> setClosed()
assert(isOpen() == false); // should have issued shutdown
}
}
else
{
shutdown(); // signal
closeConnection(); // real -> setClosed()
assert(isOpen() == false); // should have issued shutdown
}
return true;
}
else
{
LOG_WRN("Timeout check socket after " << durTotal << ", " << getStatsString(now));
}
return false;
}

bool StreamSocket::parseHeader(const char *clientName,
Poco::MemoryInputStream &message,
Poco::Net::HTTPRequest &request,
Expand Down Expand Up @@ -1539,7 +1672,7 @@ bool StreamSocket::compactChunks(MessageMap& map)
bool StreamSocket::sniffSSL() const
{
// Only sniffing the first bytes of a socket.
if (_bytesSent > 0 || _bytesRecvd != _inBuffer.size() || _bytesRecvd < 6)
if (getBytesSent() > 0 || getBytesRcvd() != _inBuffer.size() || getBytesRcvd() < 6)
return false;

// 0x0000 16 03 01 02 00 01 00 01
Expand Down
Loading

0 comments on commit 3ef99c5

Please sign in to comment.