Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Builds/CMake/RippledCore.cmake
Original file line number Diff line number Diff line change
Expand Up @@ -957,6 +957,7 @@ if (tests)
subdir: net
#]===============================]
src/test/net/DatabaseDownloader_test.cpp
src/test/net/HTTPClient_test.cpp
#[===============================[
test sources:
subdir: nodestore
Expand Down
1 change: 1 addition & 0 deletions Builds/levelization/results/ordering.txt
Original file line number Diff line number Diff line change
Expand Up @@ -149,6 +149,7 @@ test.ledger > ripple.ledger
test.ledger > ripple.protocol
test.ledger > test.jtx
test.ledger > test.toplevel
test.net > ripple.basics
test.net > ripple.net
test.net > test.jtx
test.net > test.toplevel
Expand Down
3 changes: 0 additions & 3 deletions src/ripple/net/RPCSub.h
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@

#include <ripple/core/JobQueue.h>
#include <ripple/net/InfoSub.h>
#include <boost/asio/io_service.hpp>

namespace ripple {

Expand All @@ -39,11 +38,9 @@ class RPCSub : public InfoSub
explicit RPCSub(InfoSub::Source& source);
};

// VFALCO Why is the io_service needed?
std::shared_ptr<RPCSub>
make_RPCSub(
InfoSub::Source& source,
boost::asio::io_service& io_service,
JobQueue& jobQueue,
std::string const& strUrl,
std::string const& strUsername,
Expand Down
6 changes: 6 additions & 0 deletions src/ripple/net/impl/HTTPClient.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -451,6 +451,12 @@ class HTTPClientImp : public std::enable_shared_from_this<HTTPClientImp>,
if (mShutdown)
{
JLOG(j_.trace()) << "Complete.";

mResponse.commit(bytes_transferred);
std::string strBody{
{std::istreambuf_iterator<char>(&mResponse)},
std::istreambuf_iterator<char>()};
invokeComplete(ecResult, mStatus, mBody + strBody);
}
else
{
Expand Down
4 changes: 4 additions & 0 deletions src/ripple/net/impl/RPCCall.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1805,6 +1805,7 @@ rpcClient(
}

{
//@@start blocking-request
boost::asio::io_service isService;
RPCCall::fromNetwork(
isService,
Expand All @@ -1828,6 +1829,7 @@ rpcClient(
headers);
isService.run(); // This blocks until there are no more
// outstanding async calls.
//@@end blocking-request
}
if (jvOutput.isMember("result"))
{
Expand Down Expand Up @@ -1946,6 +1948,7 @@ fromNetwork(
// HTTP call?
auto constexpr RPC_NOTIFY = 30s;

//@@start async-request
HTTPClient::request(
bSSL,
io_service,
Expand All @@ -1970,6 +1973,7 @@ fromNetwork(
std::placeholders::_3,
j),
j);
//@@end async-request
}

} // namespace RPCCall
Expand Down
98 changes: 56 additions & 42 deletions src/ripple/net/impl/RPCSub.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -33,14 +33,12 @@ class RPCSubImp : public RPCSub
public:
RPCSubImp(
InfoSub::Source& source,
boost::asio::io_service& io_service,
JobQueue& jobQueue,
std::string const& strUrl,
std::string const& strUsername,
std::string const& strPassword,
Logs& logs)
: RPCSub(source)
, m_io_service(io_service)
, m_jobQueue(jobQueue)
, mUrl(strUrl)
, mSSL(false)
Expand Down Expand Up @@ -78,14 +76,14 @@ class RPCSubImp : public RPCSub
{
std::lock_guard sl(mLock);

// Wietse: we're not going to limit this, this is admin-port only, scale
// accordingly Dropping events just like this results in inconsistent
// data on the receiving end if (mDeque.size() >= eventQueueMax)
// {
// // Drop the previous event.
// JLOG(j_.warn()) << "RPCCall::fromNetwork drop";
// mDeque.pop_back();
// }
if (mDeque.size() >= maxQueueSize)
{
JLOG(j_.warn())
<< "RPCCall::fromNetwork drop: queue full (" << mDeque.size()
<< "), seq=" << mSeq << ", endpoint=" << mIp;
++mSeq;
return;
}

auto jm = broadcast ? j_.debug() : j_.info();
JLOG(jm) << "RPCCall::fromNetwork push: " << jvObj;
Expand Down Expand Up @@ -121,48 +119,49 @@ class RPCSubImp : public RPCSub
}

private:
// XXX Could probably create a bunch of send jobs in a single get of the
// lock.
// Maximum concurrent HTTP deliveries per batch. Bounds file
// descriptor usage while still allowing parallel delivery to
// capable endpoints. With a 1024 FD process limit shared across
// peers, clients, and the node store, 32 per subscriber is a
// meaningful but survivable chunk even with multiple subscribers.
static constexpr int maxInFlight = 32;

// Maximum queued events before dropping. At ~5-10KB per event
// this is ~80-160MB worst case — trivial memory-wise. The real
// purpose is detecting a hopelessly behind endpoint: at 100+
// events per ledger (every ~4s), 16384 events is ~10 minutes
// of buffer. Consumers detect gaps via the seq field.
static constexpr std::size_t maxQueueSize = 16384;

void
sendThread()
{
Json::Value jvEvent;
bool bSend;

do
{
// Local io_service per batch — cheap to create (just an
// internal event queue, no threads, no syscalls). Using a
// local io_service is what makes .run() block until exactly
// this batch completes, giving us flow control. Same
// pattern used by rpcClient() in RPCCall.cpp for CLI
// commands.
boost::asio::io_service io_service;
int dispatched = 0;

{
// Obtain the lock to manipulate the queue and change sending.
std::lock_guard sl(mLock);

if (mDeque.empty())
{
mSending = false;
bSend = false;
}
else
while (!mDeque.empty() && dispatched < maxInFlight)
{
auto const [seq, env] = mDeque.front();

mDeque.pop_front();

jvEvent = env;
Json::Value jvEvent = env;
jvEvent["seq"] = seq;

bSend = true;
}
}

// Send outside of the lock.
if (bSend)
{
// XXX Might not need this in a try.
try
{
JLOG(j_.info()) << "RPCCall::fromNetwork: " << mIp;

RPCCall::fromNetwork(
m_io_service,
io_service,
mIp,
mPort,
mUsername,
Expand All @@ -173,21 +172,38 @@ class RPCSubImp : public RPCSub
mSSL,
true,
logs_);
++dispatched;
}
catch (const std::exception& e)

if (dispatched == 0)
mSending = false;
}

bSend = dispatched > 0;

if (bSend)
{
try
{
JLOG(j_.info())
<< "RPCCall::fromNetwork: " << mIp << " dispatching "
<< dispatched << " events";
io_service.run();
}
catch (const std::exception& e)
{
JLOG(j_.warn())
<< "RPCCall::fromNetwork exception: " << e.what();
}
catch (...)
{
JLOG(j_.warn()) << "RPCCall::fromNetwork unknown exception";
}
}
} while (bSend);
}

private:
// Wietse: we're not going to limit this, this is admin-port only, scale
// accordingly enum { eventQueueMax = 32 };

boost::asio::io_service& m_io_service;
JobQueue& m_jobQueue;

std::string mUrl;
Expand Down Expand Up @@ -217,7 +233,6 @@ RPCSub::RPCSub(InfoSub::Source& source) : InfoSub(source, Consumer())
std::shared_ptr<RPCSub>
make_RPCSub(
InfoSub::Source& source,
boost::asio::io_service& io_service,
JobQueue& jobQueue,
std::string const& strUrl,
std::string const& strUsername,
Expand All @@ -226,7 +241,6 @@ make_RPCSub(
{
return std::make_shared<RPCSubImp>(
std::ref(source),
std::ref(io_service),
std::ref(jobQueue),
strUrl,
strUsername,
Expand Down
1 change: 0 additions & 1 deletion src/ripple/rpc/handlers/Subscribe.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,6 @@ doSubscribe(RPC::JsonContext& context)
{
auto rspSub = make_RPCSub(
context.app.getOPs(),
context.app.getIOService(),
context.app.getJobQueue(),
strUrl,
strUsername,
Expand Down
Loading
Loading