diff --git a/src/blockchain/fork_choice.cpp b/src/blockchain/fork_choice.cpp index cd5382a8..4def0d6c 100644 --- a/src/blockchain/fork_choice.cpp +++ b/src/blockchain/fork_choice.cpp @@ -73,11 +73,11 @@ namespace lean { if (not blocks_.contains(block_hash)) { return std::nullopt; } - return blocks_.at(block_hash).slot; + return blocks_.at(block_hash).message.block.slot; } Slot ForkChoiceStore::getHeadSlot() const { - return blocks_.at(head_).slot; + return blocks_.at(head_).message.block.slot; } const Config &ForkChoiceStore::getConfig() const { @@ -100,8 +100,10 @@ namespace lean { // If there is no very recent safe target, then vote for the k'th ancestor // of the head for (auto i = 0; i < JUSTIFICATION_LOOKBACK_SLOTS; ++i) { - if (blocks_.at(target_block_root).slot > blocks_.at(safe_target_).slot) { - target_block_root = blocks_.at(target_block_root).parent_root; + if (blocks_.at(target_block_root).message.block.slot + > blocks_.at(safe_target_).message.block.slot) { + target_block_root = + blocks_.at(target_block_root).message.block.parent_root; } else { break; } @@ -109,19 +111,24 @@ namespace lean { // If the latest finalized slot is very far back, then only some slots are // valid to justify, make sure the target is one of those - while (not isJustifiableSlot(latest_finalized_.slot, - blocks_.at(target_block_root).slot)) { - target_block_root = blocks_.at(target_block_root).parent_root; + while (not isJustifiableSlot( + latest_finalized_.slot, + blocks_.at(target_block_root).message.block.slot)) { + target_block_root = + blocks_.at(target_block_root).message.block.parent_root; } return Checkpoint{ .root = target_block_root, - .slot = blocks_.at(target_block_root).slot, + .slot = blocks_.at(target_block_root).message.block.slot, }; } AttestationData ForkChoiceStore::produceAttestationData(Slot slot) const { - Checkpoint head_checkpoint{.root = head_, .slot = blocks_.at(head_).slot}; + Checkpoint head_checkpoint{ + .root = head_, + .slot = blocks_.at(head_).message.block.slot, + }; auto target_checkpoint = getAttestationTarget(); @@ -277,8 +284,8 @@ namespace lean { // Consistency Check // // Validate checkpoint slots match block slots - auto &source_block = blocks_.at(data.source.root); - auto &target_block = blocks_.at(data.target.root); + auto &source_block = blocks_.at(data.source.root).message.block; + auto &target_block = blocks_.at(data.target.root).message.block; if (source_block.slot != data.source.slot) { return Error::INVALID_ATTESTATION; } @@ -507,7 +514,7 @@ namespace lean { latest_finalized_ = post_state.latest_finalized; } - blocks_.emplace(block_hash, block); + blocks_.emplace(block_hash, signed_block_with_attestation); states_.emplace(block_hash, std::move(post_state)); // Process block body attestations @@ -681,7 +688,8 @@ namespace lean { anchor = std::min_element(blocks_.begin(), blocks_.end(), [](const auto &lhs, const auto &rhs) { - return lhs.second.slot < rhs.second.slot; + return lhs.second.message.block.slot + < rhs.second.message.block.slot; }) ->first; } @@ -689,7 +697,7 @@ namespace lean { // Remember the slot of the anchor once and reuse it during the walk. // // This avoids repeated lookups inside the inner loop. - const auto start_slot = blocks_.at(anchor).slot; + const auto start_slot = blocks_.at(anchor).message.block.slot; // Prepare a table that will collect voting weight for each block. // @@ -711,9 +719,9 @@ namespace lean { // // This naturally handles partial views and ongoing sync. while (blocks_.contains(current) - and blocks_.at(current).slot > start_slot) { + and blocks_.at(current).message.block.slot > start_slot) { ++weights[current]; - current = blocks_.at(current).parent_root; + current = blocks_.at(current).message.block.parent_root; } } @@ -724,7 +732,7 @@ namespace lean { for (auto &[hash, block] : blocks_) { // 1. Structural check: skip blocks without parents (e.g., purely // genesis/orphans) - if (block.parent_root == BlockHash{}) { + if (block.message.block.parent_root == BlockHash{}) { continue; } @@ -733,7 +741,7 @@ namespace lean { continue; } - children_map[block.parent_root].push_back(hash); + children_map[block.message.block.parent_root].push_back(hash); } // Now perform the greedy walk. @@ -816,7 +824,10 @@ namespace lean { latest_justified_ = Checkpoint::from(anchor_block); latest_finalized_ = Checkpoint::from(anchor_block); - blocks_.emplace(anchor_root, std::move(anchor_block)); + blocks_.emplace(anchor_root, + SignedBlockWithAttestation{ + .message = {.block = std::move(anchor_block)}, + }); SL_INFO( logger_, "Anchor block {} at slot {}", anchor_root, anchor_block.slot); states_.emplace(anchor_root, anchor_state); diff --git a/src/blockchain/fork_choice.hpp b/src/blockchain/fork_choice.hpp index cf2695e6..4bc68ebb 100644 --- a/src/blockchain/fork_choice.hpp +++ b/src/blockchain/fork_choice.hpp @@ -55,7 +55,7 @@ namespace lean { */ class ForkChoiceStore { public: - using Blocks = std::unordered_map; + using Blocks = std::unordered_map; using SignedAttestations = std::unordered_map; diff --git a/src/blockchain/impl/fc_block_tree.cpp b/src/blockchain/impl/fc_block_tree.cpp index 8b179a39..8fd212bb 100644 --- a/src/blockchain/impl/fc_block_tree.cpp +++ b/src/blockchain/impl/fc_block_tree.cpp @@ -111,8 +111,7 @@ namespace lean::blockchain { if (it == blocks.end()) { return std::nullopt; } - // TODO(turuslan): signature - return SignedBlockWithAttestation{.message = it->second}; + return it->second; } void FCBlockTree::import(std::vector blocks) {} diff --git a/src/modules/networking/networking.cpp b/src/modules/networking/networking.cpp index 4c3d0561..68816a10 100644 --- a/src/modules/networking/networking.cpp +++ b/src/modules/networking/networking.cpp @@ -10,7 +10,6 @@ #include #include #include -#include #include #include @@ -19,6 +18,7 @@ #include #include #include +#include #include #include #include @@ -39,6 +39,10 @@ #include "modules/networking/types.hpp" namespace lean::modules { + constexpr std::chrono::seconds kConnectToPeersTimer{5}; + constexpr std::chrono::milliseconds kInitBackoff = std::chrono::seconds{10}; + constexpr std::chrono::milliseconds kMaxBackoff = std::chrono::minutes{5}; + inline auto gossipTopic(std::string_view type) { return std::format("/leanconsensus/devnet0/{}/ssz_snappy", type); } @@ -81,7 +85,8 @@ namespace lean::modules { block_tree_{std::move(block_tree)}, fork_choice_store_{std::move(fork_choice_store)}, chain_spec_{std::move(chain_spec)}, - config_{std::move(config)} { + config_{std::move(config)}, + random_{std::random_device{}()} { libp2p::log::setLoggingSystem(logging_system->getSoralog()); block_tree_ = std::make_shared(fork_choice_store_); } @@ -124,6 +129,7 @@ namespace lean::modules { io_context_ = injector->create>(); auto host = injector->create>(); + host_ = host; bool has_enr_listen_address = false; const auto &bootnodes = chain_spec_->getBootnodes(); @@ -175,29 +181,32 @@ namespace lean::modules { // Collect candidates (exclude ourselves) auto all_bootnodes = bootnodes.getBootnodes(); - std::vector candidates; - candidates.reserve(all_bootnodes.size()); - for (const auto &b : all_bootnodes) { - if (b.peer_id == peer_id) { + connectable_peers_.reserve(all_bootnodes.size()); + for (auto &bootnode : all_bootnodes) { + if (bootnode.peer_id == peer_id) { continue; } - candidates.push_back(b); - } - - // Randomly choose no more than `maxBootnodes` bootnodes to connect to. - if (config_->maxBootnodes().has_value() - and candidates.size() > *config_->maxBootnodes()) { - std::ranges::shuffle(candidates, std::default_random_engine{}); - // `resize` doesn't work without default constructor - candidates.erase(candidates.begin() + *config_->maxBootnodes(), - candidates.end()); + connectable_peers_.emplace_back(bootnode.peer_id); + peer_states_.emplace( + bootnode.peer_id, + PeerState{ + .info = + { + .id = bootnode.peer_id, + .addresses = {bootnode.address}, + }, + .state = PeerState::Connectable{.backoff = kInitBackoff}, + }); } SL_INFO(logger_, "Adding {} bootnodes to address repository", - candidates.size()); + connectable_peers_.size()); - for (const auto &bootnode : candidates) { + for (const auto &bootnode : all_bootnodes) { + if (bootnode.peer_id == peer_id) { + continue; + } std::vector addresses{bootnode.address}; // Add bootnode addresses with permanent TTL @@ -217,25 +226,16 @@ namespace lean::modules { bootnode.peer_id, result.error()); } - libp2p::PeerInfo peer_info{.id = bootnode.peer_id, - .addresses = addresses}; - libp2p::coroSpawn(*io_context_, - [weak_self{weak_from_this()}, - host, - peer_info]() -> libp2p::Coro { - if (auto r = co_await host->connect(peer_info); - not r.has_value()) { - auto self = weak_self.lock(); - if (not self) { - co_return; - } - SL_WARN(self->logger_, - "connect {} error: {}", - peer_info.id, - r.error()); - } - }); } + libp2p::timerLoop( + *io_context_, kConnectToPeersTimer, [weak_self{weak_from_this()}] { + auto self = weak_self.lock(); + if (not self) { + return false; + } + self->connectToPeers(); + return true; + }); } else { SL_DEBUG(logger_, "No bootnodes configured"); } @@ -254,6 +254,24 @@ namespace lean::modules { return; } auto peer_id = connection->remotePeer(); + auto state_it = self->peer_states_.find(peer_id); + if (state_it != self->peer_states_.end()) { + auto &state = state_it->second; + if (not std::holds_alternative(state.state)) { + if (std::holds_alternative(state.state)) { + // Connectable => Connected + auto connectable_it = + std::ranges::find(self->connectable_peers_, peer_id); + if (connectable_it == self->connectable_peers_.end()) { + throw std::logic_error{ + "inconsistent connectable_peers_ and peer_states_"}; + } + self->connectable_peers_.erase(connectable_it); + } + // Connectable | Connecting | Backoff => Connected + state.state = PeerState::Connected{}; + } + } self->loader_.dispatch_peer_connected( qtils::toSharedPtr(messages::PeerConnectedMessage{peer_id})); if (connection->isInitiator()) { @@ -295,6 +313,18 @@ namespace lean::modules { if (not self) { return; } + auto state_it = self->peer_states_.find(peer_id); + if (state_it != self->peer_states_.end()) { + auto &state = state_it->second; + if (std::holds_alternative(state.state)) { + auto backoff = kInitBackoff; + // Connected => Backoff + state.state = PeerState::Backoff{ + .backoff = backoff, + .backoff_until = Clock::now() + backoff, + }; + } + } self->loader_.dispatch_peer_disconnected( qtils::toSharedPtr(messages::PeerDisconnectedMessage{peer_id})); }; @@ -344,7 +374,8 @@ namespace lean::modules { gossip_blocks_topic_ = gossipSubscribe( "block", [weak_self{weak_from_this()}]( - SignedBlockWithAttestation &&signed_block_with_attestation, std::optional received_from) { + SignedBlockWithAttestation &&signed_block_with_attestation, + std::optional received_from) { auto self = weak_self.lock(); if (not self) { return; @@ -547,4 +578,68 @@ namespace lean::modules { } return slot_hash.slot > block_tree_->lastFinalized().slot; } + + void NetworkingImpl::connectToPeers() { + auto now = Clock::now(); + auto want = peer_states_.size(); + if (auto &limit = config_->maxBootnodes()) { + want = std::min(want, *limit); + } + size_t active = 0; + for (auto &state : peer_states_ | std::views::values) { + if (std::holds_alternative(state.state) + or std::holds_alternative(state.state)) { + ++active; + } else if (auto *backoff = + std::get_if(&state.state)) { + if (backoff->backoff_until <= now) { + // Backoff => Connectable + state.state = PeerState::Connectable{.backoff = backoff->backoff}; + connectable_peers_.emplace_back(state.info.id); + } + } + } + if (want <= active) { + return; + } + want -= active; + while (want != 0 and not connectable_peers_.empty()) { + --want; + size_t i = std::uniform_int_distribution{ + 0, connectable_peers_.size() - 1}(random_); + std::swap(connectable_peers_.at(i), connectable_peers_.back()); + auto peer_id = std::move(connectable_peers_.back()); + connectable_peers_.pop_back(); + auto &state = peer_states_.at(peer_id); + auto &connectable = std::get(state.state); + // Connectable => Connecting + state.state = PeerState::Connecting{.backoff = connectable.backoff}; + libp2p::coroSpawn( + *io_context_, + [weak_self{weak_from_this()}, + host{host_}, + peer_info{state.info}]() -> libp2p::Coro { + auto r = co_await host->connect(peer_info); + auto self = weak_self.lock(); + if (not self) { + co_return; + } + auto &state = self->peer_states_.at(peer_info.id); + if (not r.has_value()) { + SL_WARN(self->logger_, + "connect {} error: {}", + peer_info.id, + r.error()); + if (auto *connecting = + std::get_if(&state.state)) { + // Connecting => Backoff + state.state = PeerState::Backoff{ + .backoff = std::min(2 * connecting->backoff, kMaxBackoff), + .backoff_until = Clock::now() + connecting->backoff, + }; + } + } + }); + } + } } // namespace lean::modules diff --git a/src/modules/networking/networking.hpp b/src/modules/networking/networking.hpp index 94a71e6a..02626b7b 100644 --- a/src/modules/networking/networking.hpp +++ b/src/modules/networking/networking.hpp @@ -6,9 +6,11 @@ #pragma once +#include #include #include +#include #include #include #include @@ -19,6 +21,10 @@ namespace boost::asio { class io_context; } // namespace boost::asio +namespace libp2p::host { + class BasicHost; +} // namespace libp2p::host + namespace libp2p::protocol { class Ping; class Identify; @@ -46,6 +52,55 @@ namespace lean::modules { class StatusProtocol; class BlockRequestProtocol; + using Clock = std::chrono::steady_clock; + + /** + * Peer information and state. + * Peer can be in connectable list, be connecting, be connected, be backedoff. + */ + struct PeerState { + /** + * Peer not connected. + */ + struct Connectable { + /** + * Backoff to use on next connect failure. + */ + std::chrono::milliseconds backoff; + }; + /** + * Connecting to peer. + */ + struct Connecting { + /** + * Backoff to use on next connect failure. + */ + std::chrono::milliseconds backoff; + }; + /** + * Connected to peer. + */ + struct Connected {}; + /** + * Don't connect + */ + struct Backoff { + /** + * Backoff to use on next connect failure. + */ + std::chrono::milliseconds backoff; + /** + * Won't attempt to connect until this time. + */ + Clock::time_point backoff_until; + }; + /** + * Peer id and addresses to connect to. + */ + libp2p::PeerInfo info; + std::variant state; + }; + /** * Network module. * @@ -90,6 +145,11 @@ namespace lean::modules { void receiveBlock(std::optional peer_id, SignedBlockWithAttestation &&block); bool statusFinalizedIsGood(const BlockIndex &slot_hash); + /** + * Called periodically to connect to more peers if there are not enough + * connections. + */ + void connectToPeers(); NetworkingLoader &loader_; log::Logger logger_; @@ -106,11 +166,21 @@ namespace lean::modules { std::shared_ptr block_request_protocol_; std::shared_ptr gossip_; std::shared_ptr ping_; + std::shared_ptr host_; std::shared_ptr identify_; std::shared_ptr gossip_blocks_topic_; std::shared_ptr gossip_votes_topic_; std::unordered_map block_cache_; std::unordered_multimap block_children_; + std::default_random_engine random_; + /** + * Array of connectable peers to pick random peer from. + */ + std::vector connectable_peers_; + /** + * Bootnode peers states. + */ + std::unordered_map peer_states_; }; } // namespace lean::modules diff --git a/tests/unit/blockchain/fork_choice_test.cpp b/tests/unit/blockchain/fork_choice_test.cpp index 221346f3..e83aa445 100644 --- a/tests/unit/blockchain/fork_choice_test.cpp +++ b/tests/unit/blockchain/fork_choice_test.cpp @@ -121,7 +121,8 @@ auto makeBlockMap(std::vector blocks) { ForkChoiceStore::Blocks map; for (auto block : blocks) { block.setHash(); - map.emplace(block.hash(), block); + map.emplace(block.hash(), + lean::SignedBlockWithAttestation{.message = {.block = block}}); } return map; } diff --git a/vcpkg-overlay/sszpp/vcpkg.patch b/vcpkg-overlay/sszpp/vcpkg.patch index 0aee661b..2a8b4f7a 100644 --- a/vcpkg-overlay/sszpp/vcpkg.patch +++ b/vcpkg-overlay/sszpp/vcpkg.patch @@ -120,10 +120,10 @@ index 3174735..b086711 100644 } diff --git a/lib/container.hpp b/lib/container.hpp -index dc31773..5905830 100644 +index dc31773..8a4123f 100644 --- a/lib/container.hpp +++ b/lib/container.hpp -@@ -31,27 +31,59 @@ +@@ -31,27 +31,77 @@ namespace ssz { struct ssz_container { @@ -131,6 +131,15 @@ index dc31773..5905830 100644 - constexpr bool operator==(const ssz_container &rhs) const noexcept = default; + struct variable_size : std::false_type {}; + ++ ssz_container() = default; ++ ++ template ++ requires std::is_same_v ++ ssz_container(const T &) {} ++ template ++ requires (not std::is_same_v) ++ ssz_container(const T &) = delete; ++ + template + requires std::is_same_v and std::is_same_v + friend constexpr auto operator<=>(const T1&, const T2&) { @@ -147,10 +156,19 @@ index dc31773..5905830 100644 struct ssz_variable_size_container : ssz_container { struct variable_size : std::true_type {}; + ++ ssz_variable_size_container() = default; ++ ++ template ++ requires std::is_same_v ++ ssz_variable_size_container(const T &) {} ++ template ++ requires (not std::is_same_v) ++ ssz_variable_size_container(const T &) = delete; ++ + template + requires std::is_same_v and std::is_same_v + friend constexpr auto operator<=>(const T1&, const T2&) { -+ return 0 <=> 0; ++ return std::strong_ordering::equal; + } + + template @@ -193,7 +211,7 @@ index dc31773..5905830 100644 } #ifdef HAVE_YAML #define YAML_CONT(...) \ -@@ -75,7 +107,7 @@ constexpr std::uint32_t compute_total_length(const ssz_object auto &...members) +@@ -75,7 +125,7 @@ constexpr std::uint32_t compute_total_length(const ssz_object auto &...members) return (... + size_plus_placeholder(members)); } @@ -202,7 +220,7 @@ index dc31773..5905830 100644 auto fsize = compute_fixed_length(members...); auto variable = result + fsize; auto begin = result; -@@ -174,7 +206,7 @@ namespace _detail { +@@ -174,7 +224,7 @@ namespace _detail { auto _decode_member = [](const YAML::Node &node, yaml_pair auto pair) { return YAML::convert>::decode(node[pair.first], pair.second); };