Skip to content

Commit

Permalink
Give each client its own thread
Browse files Browse the repository at this point in the history
  • Loading branch information
Sjors committed Mar 5, 2025
1 parent 5fe5296 commit 6efae13
Show file tree
Hide file tree
Showing 3 changed files with 122 additions and 103 deletions.
210 changes: 107 additions & 103 deletions src/sv2/template_provider.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -170,39 +170,68 @@ void Sv2TemplateProvider::ThreadSv2Handler()
std::this_thread::sleep_for(1000ms);
}

Timer timer(m_options.fee_check_interval);
std::map<size_t, std::thread> client_threads;

while (!m_flag_interrupt_sv2) {
// We start with one template per client, which has an interface through
// which we monitor for better templates.

// TODO: give each client its own thread so they're treated equally
// and so that newly connected clients don't have to wait.
std::optional<size_t> first_client_id{};
m_connman->ForEachClient([this, &first_client_id](Sv2Client& client) {
m_connman->ForEachClient([this, &client_threads](Sv2Client& client) {
// The initial handshake is handled on the Sv2Connman thread. This
// consists of the noise protocol handshake and the initial Stratum
// v2 messages SetupConnection and CoinbaseOutputConstraints.
if (!client.m_coinbase_output_constraints_recv) return;

if (!first_client_id) {
first_client_id = client.m_id;
}
if (client_threads.contains(client.m_id)) return;

client_threads.emplace(client.m_id,
std::thread(&util::TraceThread,
strprintf("sv2-%zu", client.m_id),
[this, &client] { ThreadSv2ClientHandler(client); }));
});

// Take a break (handling new connections is not urgent)
std::this_thread::sleep_for(100ms);

LOCK(m_tp_mutex);
PruneBlockTemplateCache();
}

for (auto& thread : client_threads) {
if (thread.second.joinable()) {
// If the node is shutting down, then all pending waitNext() calls
// should return in under a second.
thread.second.join();
}
}


}

void Sv2TemplateProvider::ThreadSv2ClientHandler(Sv2Client& client)
{
// TODO: hacky way to keep track of this client still exists
const size_t client_id{client.m_id};

Timer timer(m_options.fee_check_interval);
std::shared_ptr<BlockTemplate> block_template;

while (!m_flag_interrupt_sv2) {
if (!block_template) {
LogPrintLevel(BCLog::SV2, BCLog::Level::Trace, "Generate initial block template for client id=%zu\n",
client.m_id);

// Check if we already have a template interface for this client
if (client.m_best_template_id != 0) {
LOCK(m_tp_mutex);
auto cached_template = m_block_template_cache.find(client.m_best_template_id);
if (cached_template != m_block_template_cache.end()) return;
}

// Create block template and store interface reference
// TODO: reuse template_id for clients with the same m_default_coinbase_tx_additional_output_size
// TODO: reuse template_id for clients with the same coinbase constraints
uint64_t template_id{WITH_LOCK(m_tp_mutex, return ++m_template_id;)};

// The node enforces a minimum of 2000, though not for IPC so we could go a bit
// lower, but let's not...
uint32_t block_reserved_weight{2000 + client.m_coinbase_tx_outputs_size * 4};

const auto time_start{SteadyClock::now()};
auto block_template = m_mining.createNewBlock({.use_mempool = true, .block_reserved_weight = block_reserved_weight});
block_template = m_mining.createNewBlock({.use_mempool = true, .block_reserved_weight = block_reserved_weight});
LogPrintLevel(BCLog::SV2, BCLog::Level::Trace, "Assemble template: %.2fms\n",
Ticks<MillisecondsDouble>(SteadyClock::now() - time_start));

Expand All @@ -222,112 +251,85 @@ void Sv2TemplateProvider::ThreadSv2Handler()
client.m_disconnect_flag = true;
}

timer.reset();

LOCK(m_tp_mutex);
m_block_template_cache.insert({template_id, std::move(block_template)});
m_block_template_cache.insert({template_id,block_template});
client.m_best_template_id = template_id;
});

// Do not send templates with improved fees more frequently than the fee check interval
const bool check_fees{timer.trigger()};
bool new_template{false};

// Delay event loop is no client if fully connected
if (!first_client_id) std::this_thread::sleep_for(1000ms);
}

// The future template flag is set when there's a new prevhash,
// not when there's only a fee increase.
bool future_template{false};

// For the first connected client, wait for fees to rise.
m_connman->ForEachClient([this, first_client_id, check_fees, &future_template, &new_template](Sv2Client& client) {
if (!first_client_id || client.m_id != first_client_id) return;
Assert(client.m_coinbase_output_constraints_recv);

std::shared_ptr<BlockTemplate> block_template = WITH_LOCK(m_tp_mutex, return m_block_template_cache.find(client.m_best_template_id)->second;);

CAmount fee_delta{check_fees ? m_options.fee_delta : MAX_MONEY};

// We give waitNext() a timeout of 1 second to prevent it from generating
// new templates too quickly. During this wait we're not serving newly connected clients.
// This can be cleaned up by having every client run its own thread.
const MillisecondsDouble tick{m_options.is_test ? 0 : 1000};
block_template = block_template->waitNext({.timeout = tick, .fee_threshold = fee_delta});
if (block_template) {
new_template = true;
uint256 prev_hash{block_template->getBlockHeader().hashPrevBlock};

{
LOCK(m_tp_mutex);
if (prev_hash != m_best_prev_hash) {
future_template = true;
m_best_prev_hash = prev_hash;
// Does not need to be accurate
m_last_block_time = GetTime<std::chrono::seconds>();
}

++m_template_id;
}
Assert(client.m_coinbase_output_constraints_recv);

// -sv2interval=N requires that we don't send fee updates until at least
// N seconds have gone by. So we first call waitNext() without a fee
// threshold, and then on the next while iteration we set it.
// TODO: add test coverage
const bool check_fees{m_options.is_test || timer.trigger()};

CAmount fee_delta{check_fees ? m_options.fee_delta : MAX_MONEY};

node::BlockWaitOptions options;
options.fee_threshold = fee_delta;
if (!check_fees) {
options.timeout = m_options.fee_check_interval;
LogPrintLevel(BCLog::SV2, BCLog::Level::Trace, "Ignore fee changes for -sv2interval seconds, wait for a new tip, client id=%zu\n",
client.m_id);
} else {
if (m_options.is_test) {
options.timeout = MillisecondsDouble(1000);
}
LogPrintLevel(BCLog::SV2, BCLog::Level::Trace, "Wait for fees to rise by %d sat or a new tip, client id=%zu\n",
fee_delta, client.m_id);
}

uint256 old_prev_hash{block_template->getBlockHeader().hashPrevBlock};
std::shared_ptr<BlockTemplate> tmpl = block_template->waitNext(options);

// Send it the updated template
if (!SendWork(client, WITH_LOCK(m_tp_mutex, return m_template_id;), *block_template, future_template)) {
LogPrintLevel(BCLog::SV2, BCLog::Level::Trace, "Disconnecting client id=%zu\n",
client.m_id);
client.m_disconnect_flag = true;
}
// TODO: handle disconnect without relying on undefined behavior
if (client.m_id != client_id) break;

if (tmpl) {
block_template = tmpl;
uint256 new_prev_hash{block_template->getBlockHeader().hashPrevBlock};

{
LOCK(m_tp_mutex);
m_block_template_cache.insert({m_template_id, std::move(block_template)});
client.m_best_template_id = m_template_id;
if (new_prev_hash != old_prev_hash) {
LogPrintLevel(BCLog::SV2, BCLog::Level::Trace, "Tip changed, client id=%zu\n",
client.m_id);
future_template = true;
m_best_prev_hash = new_prev_hash;
// Does not need to be accurate
m_last_block_time = GetTime<std::chrono::seconds>();
}

++m_template_id;
}

});
if (!SendWork(client, WITH_LOCK(m_tp_mutex, return m_template_id;), *block_template, future_template)) {
LogPrintLevel(BCLog::SV2, BCLog::Level::Trace, "Disconnecting client id=%zu\n",
client.m_id);
client.m_disconnect_flag = true;
}

if (new_template) {
// And generate new temlates for the other clients
m_connman->ForEachClient([this, first_client_id, &future_template](Sv2Client& client) {
if (!client.m_coinbase_output_constraints_recv) return;
if (client.m_id == first_client_id.value()) return;

std::shared_ptr<BlockTemplate> block_template = WITH_LOCK(m_tp_mutex, return m_block_template_cache.find(client.m_best_template_id)->second;);

const auto time_start{SteadyClock::now()};
// TODO: deduplicate
uint32_t block_reserved_weight{2000 + client.m_coinbase_tx_outputs_size * 4};

block_template = m_mining.createNewBlock({.use_mempool = true, .block_reserved_weight = block_reserved_weight});
LogPrintLevel(BCLog::SV2, BCLog::Level::Trace, "Assemble template: %.2fms\n",
Ticks<MillisecondsDouble>(SteadyClock::now() - time_start));

if (Assert(block_template)) {
{
LOCK(m_tp_mutex);
++m_template_id;
}

// Send it the updated template
if (!SendWork(client, WITH_LOCK(m_tp_mutex, return m_template_id;), *block_template, future_template)) {
LogPrintLevel(BCLog::SV2, BCLog::Level::Trace, "Disconnecting client id=%zu\n",
client.m_id);
client.m_disconnect_flag = true;
}

LOCK(m_tp_mutex);
m_block_template_cache.insert({m_template_id, std::move(block_template)});
client.m_best_template_id = m_template_id;
}
timer.reset();

});
LOCK(m_tp_mutex);
m_block_template_cache.insert({m_template_id, block_template});
client.m_best_template_id = m_template_id;
} else {
// In production this only happens during shutdown, in tests timeouts are expected.
LogPrintLevel(BCLog::SV2, BCLog::Level::Trace, "Timeout for client id=%zu\n",
client.m_id);
}

// Prune old templates and continue the loop.
LOCK(m_tp_mutex);
PruneBlockTemplateCache();

if (m_options.is_test) {
// Take a very short break
std::this_thread::sleep_for(1ms);
// Take a break
std::this_thread::sleep_for(50ms);
}
}
}
Expand Down Expand Up @@ -453,6 +455,8 @@ bool Sv2TemplateProvider::SendWork(Sv2Client& client, uint64_t template_id, Bloc
template_id,
future_template};

// TODO: use optimistic send instead of adding to the queue

LogPrintLevel(BCLog::SV2, BCLog::Level::Debug, "Send 0x71 NewTemplate id=%lu future=%d to client id=%zu\n", template_id, future_template, client.m_id);
client.m_send_messages.emplace_back(new_template);

Expand Down
12 changes: 12 additions & 0 deletions src/sv2/template_provider.h
Original file line number Diff line number Diff line change
Expand Up @@ -125,6 +125,18 @@ class Sv2TemplateProvider : public Sv2EventsInterface
*/
void ThreadSv2Handler() EXCLUSIVE_LOCKS_REQUIRED(!m_tp_mutex);

/**
* Give each client its own thread so they're treated equally
* and so that newly connected clients don't have to wait.
* This scales very poorly, because block template creation is
* slow, but is easier to reason about.
*
* A typical miner as well as a typical pool will only need one
* connection. For the use case of a public facing template provider,
* further changes are needed anyway e.g. for DoS resistance.
*/
void ThreadSv2ClientHandler(Sv2Client& client) EXCLUSIVE_LOCKS_REQUIRED(!m_tp_mutex);

/**
* Triggered on interrupt signals to stop the main event loop in ThreadSv2Handler().
*/
Expand Down
3 changes: 3 additions & 0 deletions src/test/sv2_template_provider_tests.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,9 @@

#include <memory>

// For verbose debugging use:
// build/src/test/test_bitcoin --run_test=sv2_template_provider_tests --log_level=all -- -debug=sv2 -loglevel=sv2:trace -printtoconsole=1 | grep -v disabled

BOOST_FIXTURE_TEST_SUITE(sv2_template_provider_tests, TestChain100Setup)

/**
Expand Down

0 comments on commit 6efae13

Please sign in to comment.