Skip to content
Merged
Show file tree
Hide file tree
Changes from 6 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
169 changes: 132 additions & 37 deletions src/modules/networking/networking.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,6 @@
#include <algorithm>
#include <format>
#include <memory>
#include <random>
#include <stdexcept>

#include <app/configuration.hpp>
Expand All @@ -19,6 +18,7 @@
#include <libp2p/basic/read_varint.hpp>
#include <libp2p/basic/write_varint.hpp>
#include <libp2p/coro/spawn.hpp>
#include <libp2p/coro/timer_loop.hpp>
#include <libp2p/crypto/key_marshaller.hpp>
#include <libp2p/crypto/sha/sha256.hpp>
#include <libp2p/host/basic_host.hpp>
Expand All @@ -39,6 +39,10 @@
#include "modules/networking/types.hpp"

namespace lean::modules {
constexpr std::chrono::seconds kConnectToPeersTimer{5};
constexpr std::chrono::milliseconds kInitBackoff = std::chrono::seconds{10};
constexpr std::chrono::milliseconds kMaxBackoff = std::chrono::minutes{5};

inline auto gossipTopic(std::string_view type) {
return std::format("/leanconsensus/devnet0/{}/ssz_snappy", type);
}
Expand Down Expand Up @@ -81,7 +85,8 @@ namespace lean::modules {
block_tree_{std::move(block_tree)},
fork_choice_store_{std::move(fork_choice_store)},
chain_spec_{std::move(chain_spec)},
config_{std::move(config)} {
config_{std::move(config)},
random_{std::random_device{}()} {
libp2p::log::setLoggingSystem(logging_system->getSoralog());
block_tree_ = std::make_shared<blockchain::FCBlockTree>(fork_choice_store_);
}
Expand Down Expand Up @@ -124,6 +129,7 @@ namespace lean::modules {
io_context_ = injector->create<std::shared_ptr<boost::asio::io_context>>();

auto host = injector->create<std::shared_ptr<libp2p::host::BasicHost>>();
host_ = host;

bool has_enr_listen_address = false;
const auto &bootnodes = chain_spec_->getBootnodes();
Expand Down Expand Up @@ -175,29 +181,32 @@ namespace lean::modules {

// Collect candidates (exclude ourselves)
auto all_bootnodes = bootnodes.getBootnodes();
std::vector<decltype(all_bootnodes)::value_type> candidates;
candidates.reserve(all_bootnodes.size());
for (const auto &b : all_bootnodes) {
if (b.peer_id == peer_id) {
connectable_peers_.reserve(all_bootnodes.size());
for (auto &bootnode : all_bootnodes) {
if (bootnode.peer_id == peer_id) {
continue;
}
candidates.push_back(b);
}

// Randomly choose no more than `maxBootnodes` bootnodes to connect to.
if (config_->maxBootnodes().has_value()
and candidates.size() > *config_->maxBootnodes()) {
std::ranges::shuffle(candidates, std::default_random_engine{});
// `resize` doesn't work without default constructor
candidates.erase(candidates.begin() + *config_->maxBootnodes(),
candidates.end());
connectable_peers_.emplace_back(bootnode.peer_id);
peer_states_.emplace(
bootnode.peer_id,
PeerState{
.info =
{
.id = bootnode.peer_id,
.addresses = {bootnode.address},
},
.state = PeerState::Connectable{.backoff = kInitBackoff},
});
}

SL_INFO(logger_,
"Adding {} bootnodes to address repository",
candidates.size());
connectable_peers_.size());

for (const auto &bootnode : candidates) {
for (const auto &bootnode : all_bootnodes) {
if (bootnode.peer_id == peer_id) {
continue;
}
std::vector<libp2p::multi::Multiaddress> addresses{bootnode.address};

// Add bootnode addresses with permanent TTL
Expand All @@ -217,25 +226,16 @@ namespace lean::modules {
bootnode.peer_id,
result.error());
}
libp2p::PeerInfo peer_info{.id = bootnode.peer_id,
.addresses = addresses};
libp2p::coroSpawn(*io_context_,
[weak_self{weak_from_this()},
host,
peer_info]() -> libp2p::Coro<void> {
if (auto r = co_await host->connect(peer_info);
not r.has_value()) {
auto self = weak_self.lock();
if (not self) {
co_return;
}
SL_WARN(self->logger_,
"connect {} error: {}",
peer_info.id,
r.error());
}
});
}
libp2p::timerLoop(
*io_context_, kConnectToPeersTimer, [weak_self{weak_from_this()}] {
auto self = weak_self.lock();
if (not self) {
return false;
}
self->connectToPeers();
return true;
});
} else {
SL_DEBUG(logger_, "No bootnodes configured");
}
Expand All @@ -254,6 +254,24 @@ namespace lean::modules {
return;
}
auto peer_id = connection->remotePeer();
auto state_it = self->peer_states_.find(peer_id);
if (state_it != self->peer_states_.end()) {
auto &state = state_it->second;
if (not std::holds_alternative<PeerState::Connected>(state.state)) {
if (std::holds_alternative<PeerState::Connectable>(state.state)) {
// Connectable => Connected
auto connectable_it =
std::ranges::find(self->connectable_peers_, peer_id);
if (connectable_it == self->connectable_peers_.end()) {
throw std::logic_error{
"inconsistent connectable_peers_ and peer_states_"};
}
self->connectable_peers_.erase(connectable_it);
}
// Connectable | Connecting | Backoff => Connected
state.state = PeerState::Connected{};
}
}
self->loader_.dispatch_peer_connected(
qtils::toSharedPtr(messages::PeerConnectedMessage{peer_id}));
if (connection->isInitiator()) {
Expand Down Expand Up @@ -295,6 +313,18 @@ namespace lean::modules {
if (not self) {
return;
}
auto state_it = self->peer_states_.find(peer_id);
if (state_it != self->peer_states_.end()) {
auto &state = state_it->second;
if (std::holds_alternative<PeerState::Connected>(state.state)) {
auto backoff = kInitBackoff;
// Connected => Backoff
state.state = PeerState::Backoff{
.backoff = backoff,
.backoff_until = Clock::now() + backoff,
};
}
}
self->loader_.dispatch_peer_disconnected(
qtils::toSharedPtr(messages::PeerDisconnectedMessage{peer_id}));
};
Expand Down Expand Up @@ -344,7 +374,8 @@ namespace lean::modules {
gossip_blocks_topic_ = gossipSubscribe<SignedBlockWithAttestation>(
"block",
[weak_self{weak_from_this()}](
SignedBlockWithAttestation &&signed_block_with_attestation, std::optional<libp2p::PeerId> received_from) {
SignedBlockWithAttestation &&signed_block_with_attestation,
std::optional<libp2p::PeerId> received_from) {
auto self = weak_self.lock();
if (not self) {
return;
Expand Down Expand Up @@ -547,4 +578,68 @@ namespace lean::modules {
}
return slot_hash.slot > block_tree_->lastFinalized().slot;
}

void NetworkingImpl::connectToPeers() {
auto now = Clock::now();
auto want = peer_states_.size();
if (auto &limit = config_->maxBootnodes()) {
want = std::min(want, *limit);
}
size_t active = 0;
for (auto &state : peer_states_ | std::views::values) {
if (std::holds_alternative<PeerState::Connecting>(state.state)
or std::holds_alternative<PeerState::Connected>(state.state)) {
++active;
} else if (auto *backoff =
std::get_if<PeerState::Backoff>(&state.state)) {
if (backoff->backoff_until <= now) {
// Backoff => Connectable
state.state = PeerState::Connectable{.backoff = backoff->backoff};
connectable_peers_.emplace_back(state.info.id);
}
}
}
if (want <= active) {
return;
}
want -= active;
while (want != 0 and not connectable_peers_.empty()) {
--want;
size_t i = std::uniform_int_distribution<size_t>{
0, connectable_peers_.size() - 1}(random_);
std::swap(connectable_peers_.at(i), connectable_peers_.back());
auto peer_id = std::move(connectable_peers_.back());
connectable_peers_.pop_back();
auto &state = peer_states_.at(peer_id);
auto &connectable = std::get<PeerState::Connectable>(state.state);
// Connectable => Connecting
state.state = PeerState::Connecting{.backoff = connectable.backoff};
libp2p::coroSpawn(
*io_context_,
[weak_self{weak_from_this()},
host{host_},
peer_info{state.info}]() -> libp2p::Coro<void> {
auto r = co_await host->connect(peer_info);
auto self = weak_self.lock();
if (not self) {
co_return;
}
auto &state = self->peer_states_.at(peer_info.id);
if (not r.has_value()) {
SL_WARN(self->logger_,
"connect {} error: {}",
peer_info.id,
r.error());
if (auto *connecting =
std::get_if<PeerState::Connecting>(&state.state)) {
// Connecting => Backoff
state.state = PeerState::Backoff{
.backoff = std::min(2 * connecting->backoff, kMaxBackoff),
.backoff_until = Clock::now() + connecting->backoff,
};
}
}
});
}
}
} // namespace lean::modules
70 changes: 70 additions & 0 deletions src/modules/networking/networking.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -6,9 +6,11 @@

#pragma once

#include <random>
#include <thread>

#include <libp2p/event/bus.hpp>
#include <libp2p/peer/peer_info.hpp>
#include <log/logger.hpp>
#include <modules/networking/interfaces.hpp>
#include <qtils/create_smart_pointer_macros.hpp>
Expand All @@ -19,6 +21,10 @@ namespace boost::asio {
class io_context;
} // namespace boost::asio

namespace libp2p::host {
class BasicHost;
} // namespace libp2p::host

namespace libp2p::protocol {
class Ping;
class Identify;
Expand Down Expand Up @@ -46,6 +52,55 @@ namespace lean::modules {
class StatusProtocol;
class BlockRequestProtocol;

using Clock = std::chrono::steady_clock;

/**
* Peer information and state.
* Peer can be in connectable list, be connecting, be connected, be backedoff.
*/
struct PeerState {
/**
* Peer not connected.
*/
struct Connectable {
/**
* Backoff to use on next connect failure.
*/
std::chrono::milliseconds backoff;
};
/**
* Connecting to peer.
*/
struct Connecting {
/**
* Backoff to use on next connect failure.
*/
std::chrono::milliseconds backoff;
};
/**
* Connected to peer.
*/
struct Connected {};
/**
* Don't connect
*/
struct Backoff {
/**
* Backoff to use on next connect failure.
*/
std::chrono::milliseconds backoff;
/**
* Won't attempt to connect until this time.
*/
Clock::time_point backoff_until;
};
/**
* Peer id and addresses to connect to.
*/
libp2p::PeerInfo info;
std::variant<Connectable, Connecting, Connected, Backoff> state;
};

/**
* Network module.
*
Expand Down Expand Up @@ -90,6 +145,11 @@ namespace lean::modules {
void receiveBlock(std::optional<libp2p::PeerId> peer_id,
SignedBlockWithAttestation &&block);
bool statusFinalizedIsGood(const BlockIndex &slot_hash);
/**
* Called periodically to connect to more peers if there are not enough
* connections.
*/
void connectToPeers();

NetworkingLoader &loader_;
log::Logger logger_;
Expand All @@ -106,11 +166,21 @@ namespace lean::modules {
std::shared_ptr<BlockRequestProtocol> block_request_protocol_;
std::shared_ptr<libp2p::protocol::gossip::Gossip> gossip_;
std::shared_ptr<libp2p::protocol::Ping> ping_;
std::shared_ptr<libp2p::host::BasicHost> host_;
std::shared_ptr<libp2p::protocol::Identify> identify_;
std::shared_ptr<libp2p::protocol::gossip::Topic> gossip_blocks_topic_;
std::shared_ptr<libp2p::protocol::gossip::Topic> gossip_votes_topic_;
std::unordered_map<BlockHash, SignedBlockWithAttestation> block_cache_;
std::unordered_multimap<BlockHash, BlockHash> block_children_;
std::default_random_engine random_;
/**
* Array of connectable peers to pick random peer from.
*/
std::vector<libp2p::PeerId> connectable_peers_;
/**
* Bootnode peers states.
*/
std::unordered_map<libp2p::PeerId, PeerState> peer_states_;
};

} // namespace lean::modules
Loading