Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
148 changes: 112 additions & 36 deletions src/modules/networking/networking.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,6 @@
#include <algorithm>
#include <format>
#include <memory>
#include <random>
#include <stdexcept>

#include <app/configuration.hpp>
Expand All @@ -19,6 +18,7 @@
#include <libp2p/basic/read_varint.hpp>
#include <libp2p/basic/write_varint.hpp>
#include <libp2p/coro/spawn.hpp>
#include <libp2p/coro/timer_loop.hpp>
#include <libp2p/crypto/key_marshaller.hpp>
#include <libp2p/crypto/sha/sha256.hpp>
#include <libp2p/host/basic_host.hpp>
Expand All @@ -39,6 +39,10 @@
#include "modules/networking/types.hpp"

namespace lean::modules {
constexpr std::chrono::seconds kConnectToPeersTimer{5};
constexpr std::chrono::milliseconds kInitBackoff = std::chrono::seconds{10};
constexpr std::chrono::milliseconds kMaxBackoff = std::chrono::minutes{5};

inline auto gossipTopic(std::string_view type) {
return std::format("/leanconsensus/devnet0/{}/ssz_snappy", type);
}
Expand Down Expand Up @@ -124,6 +128,7 @@ namespace lean::modules {
io_context_ = injector->create<std::shared_ptr<boost::asio::io_context>>();

auto host = injector->create<std::shared_ptr<libp2p::host::BasicHost>>();
host_ = host;

bool has_enr_listen_address = false;
const auto &bootnodes = chain_spec_->getBootnodes();
Expand Down Expand Up @@ -175,29 +180,32 @@ namespace lean::modules {

// Collect candidates (exclude ourselves)
auto all_bootnodes = bootnodes.getBootnodes();
std::vector<decltype(all_bootnodes)::value_type> candidates;
candidates.reserve(all_bootnodes.size());
for (const auto &b : all_bootnodes) {
if (b.peer_id == peer_id) {
connectable_peers_.reserve(all_bootnodes.size());
for (auto &bootnode : all_bootnodes) {
if (bootnode.peer_id == peer_id) {
continue;
}
candidates.push_back(b);
}

// Randomly choose no more than `maxBootnodes` bootnodes to connect to.
if (config_->maxBootnodes().has_value()
and candidates.size() > *config_->maxBootnodes()) {
std::ranges::shuffle(candidates, std::default_random_engine{});
// `resize` doesn't work without default constructor
candidates.erase(candidates.begin() + *config_->maxBootnodes(),
candidates.end());
connectable_peers_.emplace_back(bootnode.peer_id);
peer_states_.emplace(
bootnode.peer_id,
PeerState{
.info =
{
.id = bootnode.peer_id,
.addresses = {bootnode.address},
},
.state = PeerState::Connectable{.backoff = kInitBackoff},
});
}

SL_INFO(logger_,
"Adding {} bootnodes to address repository",
candidates.size());
connectable_peers_.size());

for (const auto &bootnode : candidates) {
for (const auto &bootnode : all_bootnodes) {
if (bootnode.peer_id == peer_id) {
continue;
}
std::vector<libp2p::multi::Multiaddress> addresses{bootnode.address};

// Add bootnode addresses with permanent TTL
Expand All @@ -217,25 +225,16 @@ namespace lean::modules {
bootnode.peer_id,
result.error());
}
libp2p::PeerInfo peer_info{.id = bootnode.peer_id,
.addresses = addresses};
libp2p::coroSpawn(*io_context_,
[weak_self{weak_from_this()},
host,
peer_info]() -> libp2p::Coro<void> {
if (auto r = co_await host->connect(peer_info);
not r.has_value()) {
auto self = weak_self.lock();
if (not self) {
co_return;
}
SL_WARN(self->logger_,
"connect {} error: {}",
peer_info.id,
r.error());
}
});
}
libp2p::timerLoop(
*io_context_, kConnectToPeersTimer, [weak_self{weak_from_this()}] {
auto self = weak_self.lock();
if (not self) {
return false;
}
self->connectToPeers();
return true;
});
} else {
SL_DEBUG(logger_, "No bootnodes configured");
}
Expand Down Expand Up @@ -295,6 +294,18 @@ namespace lean::modules {
if (not self) {
return;
}
auto state_it = self->peer_states_.find(peer_id);
if (state_it != self->peer_states_.end()) {
auto &state = state_it->second;
if (std::holds_alternative<PeerState::Connected>(state.state)) {
auto backoff = kInitBackoff;
// Connected => Backoff
state.state = PeerState::Backoff{
.backoff = backoff,
.backoff_until = Clock::now() + backoff,
};
}
}
self->loader_.dispatch_peer_disconnected(
qtils::toSharedPtr(messages::PeerDisconnectedMessage{peer_id}));
};
Expand Down Expand Up @@ -344,7 +355,8 @@ namespace lean::modules {
gossip_blocks_topic_ = gossipSubscribe<SignedBlockWithAttestation>(
"block",
[weak_self{weak_from_this()}](
SignedBlockWithAttestation &&signed_block_with_attestation, std::optional<libp2p::PeerId> received_from) {
SignedBlockWithAttestation &&signed_block_with_attestation,
std::optional<libp2p::PeerId> received_from) {
auto self = weak_self.lock();
if (not self) {
return;
Expand Down Expand Up @@ -547,4 +559,68 @@ namespace lean::modules {
}
return slot_hash.slot > block_tree_->lastFinalized().slot;
}

void NetworkingImpl::connectToPeers() {
auto now = Clock::now();
auto want = peer_states_.size();
if (auto &limit = config_->maxBootnodes()) {
want = std::min(want, *limit);
}
size_t active = 0;
for (auto &state : peer_states_ | std::views::values) {
if (std::holds_alternative<PeerState::Connecting>(state.state)
or std::holds_alternative<PeerState::Connected>(state.state)) {
++active;
} else if (auto *backoff =
std::get_if<PeerState::Backoff>(&state.state)) {
if (backoff->backoff_until <= now) {
// Backoff => Connectable
state.state = PeerState::Connectable{.backoff = backoff->backoff};
connectable_peers_.emplace_back(state.info.id);
}
}
}
if (want <= active) {
return;
}
want -= active;
while (want != 0 and not connectable_peers_.empty()) {
size_t i = std::uniform_int_distribution<size_t>{
0, connectable_peers_.size() - 1}(random_);
std::swap(connectable_peers_.at(i), connectable_peers_.back());
auto peer_id = std::move(connectable_peers_.back());
connectable_peers_.pop_back();
auto &state = peer_states_.at(peer_id);
auto &connectable = std::get<PeerState::Connectable>(state.state);
// Connectable => Connecting
state.state = PeerState::Connecting{.backoff = connectable.backoff};
libp2p::coroSpawn(
*io_context_,
[weak_self{weak_from_this()},
host{host_},
&state,
peer_info{state.info}]() -> libp2p::Coro<void> {
auto r = co_await host->connect(peer_info);
auto self = weak_self.lock();
if (not self) {
co_return;
}
auto &connecting = std::get<PeerState::Connecting>(state.state);
if (r.has_value()) {
// Connecting => Connected
state.state = PeerState::Connected{};
} else {
SL_WARN(self->logger_,
"connect {} error: {}",
peer_info.id,
r.error());
// Connecting => Backoff
state.state = PeerState::Backoff{
.backoff = std::min(2 * connecting.backoff, kMaxBackoff),
.backoff_until = Clock::now() + connecting.backoff,
};
}
});
}
}
} // namespace lean::modules
29 changes: 29 additions & 0 deletions src/modules/networking/networking.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -6,9 +6,11 @@

#pragma once

#include <random>
#include <thread>

#include <libp2p/event/bus.hpp>
#include <libp2p/peer/peer_info.hpp>
#include <log/logger.hpp>
#include <modules/networking/interfaces.hpp>
#include <qtils/create_smart_pointer_macros.hpp>
Expand All @@ -19,6 +21,10 @@ namespace boost::asio {
class io_context;
} // namespace boost::asio

namespace libp2p::host {
class BasicHost;
} // namespace libp2p::host

namespace libp2p::protocol {
class Ping;
class Identify;
Expand Down Expand Up @@ -46,6 +52,24 @@ namespace lean::modules {
class StatusProtocol;
class BlockRequestProtocol;

using Clock = std::chrono::steady_clock;

struct PeerState {
struct Connectable {
std::chrono::milliseconds backoff;
};
struct Connecting {
std::chrono::milliseconds backoff;
};
struct Connected {};
struct Backoff {
std::chrono::milliseconds backoff;
Clock::time_point backoff_until;
};
libp2p::PeerInfo info;
std::variant<Connectable, Connecting, Connected, Backoff> state;
};

/**
* Network module.
*
Expand Down Expand Up @@ -90,6 +114,7 @@ namespace lean::modules {
void receiveBlock(std::optional<libp2p::PeerId> peer_id,
SignedBlockWithAttestation &&block);
bool statusFinalizedIsGood(const BlockIndex &slot_hash);
void connectToPeers();

NetworkingLoader &loader_;
log::Logger logger_;
Expand All @@ -106,11 +131,15 @@ namespace lean::modules {
std::shared_ptr<BlockRequestProtocol> block_request_protocol_;
std::shared_ptr<libp2p::protocol::gossip::Gossip> gossip_;
std::shared_ptr<libp2p::protocol::Ping> ping_;
std::shared_ptr<libp2p::host::BasicHost> host_;
std::shared_ptr<libp2p::protocol::Identify> identify_;
std::shared_ptr<libp2p::protocol::gossip::Topic> gossip_blocks_topic_;
std::shared_ptr<libp2p::protocol::gossip::Topic> gossip_votes_topic_;
std::unordered_map<BlockHash, SignedBlockWithAttestation> block_cache_;
std::unordered_multimap<BlockHash, BlockHash> block_children_;
std::default_random_engine random_;
std::vector<libp2p::PeerId> connectable_peers_;
std::unordered_map<libp2p::PeerId, PeerState> peer_states_;
};

} // namespace lean::modules
Loading