Skip to content

Commit

Permalink
Cache fanout candidates to optimize txreconciliation
Browse files Browse the repository at this point in the history
  • Loading branch information
naumenkogs committed Jan 10, 2024
1 parent d595e74 commit b54d8ec
Show file tree
Hide file tree
Showing 2 changed files with 49 additions and 15 deletions.
60 changes: 47 additions & 13 deletions src/node/txreconciliation.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,10 @@ constexpr size_t OUTBOUND_FANOUT_DESTINATIONS = 1;
* reconciliation sets and short ids mappings, and CPU used for sketch computation.
*/
constexpr size_t MAX_SET_SIZE = 3000;
/**
* Maximum number of transactions for which we store assigned fanout targets.
*/
constexpr size_t FANOUT_TARGETS_PER_TX_CACHE_SIZE = 3000;
/**
* Salt (specified by BIP-330) constructed from contributions from both peers. It is used
* to compute transaction short IDs, which are then used to construct a sketch representing a set
Expand Down Expand Up @@ -90,6 +94,19 @@ class TxReconciliationTracker::Impl
*/
std::unordered_map<NodeId, std::variant<uint64_t, TxReconciliationState>> m_states GUARDED_BY(m_txreconciliation_mutex);

/*
* A least-recently-added cache tracking which peers we should fanout a transaction to.
*
* Since the time between cache accesses is on the order of seconds, returning an outdated
* set of peers is not a concern (especially since we fanout to outbound peers, which should
* be hard to manipulate).
*
* No need to use LRU (bump transaction order upon access) because in most cases
* transactions are processed almost-sequentially.
*/
std::deque<Wtxid> tx_fanout_targets_cache_order;
std::map<Wtxid, std::set<NodeId>> tx_fanout_targets_cache_data GUARDED_BY(m_txreconciliation_mutex);

public:
explicit Impl(uint32_t recon_version) : m_recon_version(recon_version) {}

Expand Down Expand Up @@ -204,10 +221,19 @@ class TxReconciliationTracker::Impl
return IsPeerRegistered(peer_id);
}

bool IsFanoutTarget(const CSipHasher& deterministic_randomizer_with_wtxid,
bool IsFanoutTarget(CSipHasher&& deterministic_randomizer,
bool we_initiate, double limit,
NodeId peer_id) const EXCLUSIVE_LOCKS_REQUIRED(m_txreconciliation_mutex)
NodeId peer_id, const Wtxid& wtxid) EXCLUSIVE_LOCKS_REQUIRED(m_txreconciliation_mutex)
{
auto fanout_candidates = tx_fanout_targets_cache_data.find(wtxid);
if (fanout_candidates != tx_fanout_targets_cache_data.end()) {
return fanout_candidates->second.find(peer_id) != fanout_candidates->second.end();
}

// We use the pre-determined randomness to give a consistent result per transaction,
// thus making sure that no transaction gets "unlucky" if every per-peer roll fails.
deterministic_randomizer.Write(wtxid.ToUint256());

// The algorithm works as follows. We iterate through the peers (of a given direction)
// hashing them with the given wtxid, and sort them by this hash.
// We then consider top `limit` peers to be low-fanout flood targets.
Expand All @@ -217,15 +243,15 @@ class TxReconciliationTracker::Impl
// The fractional part of `limit` is stored in the lower 32 bits, and then we check
// whether adding a random lower 32-bit value (first element) would end up modifying
// the higher bits.
const size_t targets_size = ((deterministic_randomizer_with_wtxid.Finalize() & 0xFFFFFFFF) + uint64_t(limit * 0x100000000)) >> 32;
const size_t targets_size = ((deterministic_randomizer.Finalize() & 0xFFFFFFFF) + uint64_t(limit * 0x100000000)) >> 32;

std::vector<std::pair<uint64_t, NodeId>> best_peers;
best_peers.reserve(m_states.size());

for (const auto& indexed_state : m_states) {
const auto cur_state = std::get_if<TxReconciliationState>(&indexed_state.second);
if (cur_state && cur_state->m_we_initiate == we_initiate) {
uint64_t hash_key = CSipHasher(deterministic_randomizer_with_wtxid).Write(cur_state->m_k0).Finalize();
uint64_t hash_key = CSipHasher(deterministic_randomizer).Write(cur_state->m_k0).Finalize();
best_peers.emplace_back(hash_key, indexed_state.first);
}
}
Expand All @@ -235,16 +261,27 @@ class TxReconciliationTracker::Impl
};
std::sort(best_peers.begin(), best_peers.end(), cmp_by_key);

std::set<NodeId> new_fanout_candidates;
auto it = best_peers.begin();
for (size_t i = 0; i < targets_size && it != best_peers.end(); ++i, ++it) {
if (it->second == peer_id) return true;
new_fanout_candidates.insert(it->second);
}
return false;

tx_fanout_targets_cache_data.emplace(wtxid, new_fanout_candidates);

// If the cache is full, make room for the new entry.
if (tx_fanout_targets_cache_order.size () == FANOUT_TARGETS_PER_TX_CACHE_SIZE) {
auto expired_tx = tx_fanout_targets_cache_order.front();
tx_fanout_targets_cache_data.erase(expired_tx);
tx_fanout_targets_cache_order.pop_front();
}
tx_fanout_targets_cache_order.push_back(wtxid);
return new_fanout_candidates.find(peer_id) != new_fanout_candidates.end();
}

bool ShouldFanoutTo(const Wtxid& wtxid, CSipHasher&& deterministic_randomizer, NodeId peer_id,
size_t inbounds_nonrcncl_tx_relay, size_t outbounds_nonrcncl_tx_relay)
const EXCLUSIVE_LOCKS_REQUIRED(!m_txreconciliation_mutex)
EXCLUSIVE_LOCKS_REQUIRED(!m_txreconciliation_mutex)
{
AssertLockNotHeld(m_txreconciliation_mutex);
LOCK(m_txreconciliation_mutex);
Expand Down Expand Up @@ -281,10 +318,7 @@ class TxReconciliationTracker::Impl
return false;
}

// We use the pre-determined randomness to give a consistent result per transaction,
// thus making sure that no transaction gets "unlucky" if every per-peer roll fails.
deterministic_randomizer.Write(wtxid.ToUint256());
return IsFanoutTarget(std::move(deterministic_randomizer), recon_state.m_we_initiate, destinations, peer_id);
return IsFanoutTarget(std::move(deterministic_randomizer), recon_state.m_we_initiate, destinations, peer_id, wtxid);
}
};

Expand Down Expand Up @@ -323,8 +357,8 @@ bool TxReconciliationTracker::IsPeerRegistered(NodeId peer_id) const
return m_impl->IsPeerRegisteredExternal(peer_id);
}

bool TxReconciliationTracker::ShouldFanoutTo(const Wtxid& wtxid, CSipHasher&& deterministic_randomizer, NodeId peer_id,
size_t inbounds_nonrcncl_tx_relay, size_t outbounds_nonrcncl_tx_relay) const
bool TxReconciliationTracker::ShouldFanoutTo(const Wtxid& wtxid, CSipHasher deterministic_randomizer, NodeId peer_id,
size_t inbounds_nonrcncl_tx_relay, size_t outbounds_nonrcncl_tx_relay)
{
return m_impl->ShouldFanoutTo(wtxid, std::move(deterministic_randomizer), peer_id,
inbounds_nonrcncl_tx_relay, outbounds_nonrcncl_tx_relay);
Expand Down
4 changes: 2 additions & 2 deletions src/node/txreconciliation.h
Original file line number Diff line number Diff line change
Expand Up @@ -102,8 +102,8 @@ class TxReconciliationTracker
/**
* Returns whether the peer is chosen as a low-fanout destination for a given tx.
*/
bool ShouldFanoutTo(const Wtxid& wtxid, CSipHasher&& deterministic_randomizer, NodeId peer_id,
size_t inbounds_nonrcncl_tx_relay, size_t outbounds_nonrcncl_tx_relay) const;
bool ShouldFanoutTo(const Wtxid& wtxid, CSipHasher deterministic_randomizer, NodeId peer_id,
size_t inbounds_nonrcncl_tx_relay, size_t outbounds_nonrcncl_tx_relay);
};

#endif // BITCOIN_NODE_TXRECONCILIATION_H

0 comments on commit b54d8ec

Please sign in to comment.