Skip to content

Commit

Permalink
Configure Bootstrap to accept list of boot nodes
Browse files Browse the repository at this point in the history
  • Loading branch information
momosh-ethernal authored Aug 1, 2024
1 parent 2fbed04 commit cb57787
Show file tree
Hide file tree
Showing 5 changed files with 126 additions and 14 deletions.
4 changes: 4 additions & 0 deletions bootstrap/CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1 +1,5 @@
# Changelog

## [0.2.0]

- Added new configuration parameter `bootstraps` which explicitly adds a list of known listen address of provided bootstraps to the routing table
6 changes: 6 additions & 0 deletions bootstrap/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -131,6 +131,12 @@ async fn run() -> Result<()> {
info!("Started listening for TCP traffic on port: {:?}.", cfg.port);

info!("Bootstrap node starting ...");
// add bootstrap nodes, if provided
if !cfg.bootstraps.is_empty() {
network_client
.add_bootstrap_nodes(cfg.bootstraps.iter().map(Into::into).collect())
.await?;
}
network_client.bootstrap().await?;
info!("Bootstrap done.");
loop_handle.await?;
Expand Down
31 changes: 31 additions & 0 deletions bootstrap/src/p2p/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,32 @@ impl Client {
.context("Sender not to be dropped.")?
}

pub async fn dial_peer(&self, peer_id: PeerId, peer_address: Multiaddr) -> Result<()> {
let (response_sender, response_receiver) = oneshot::channel();
self.command_sender
.send(Command::DialPeer {
peer_id,
peer_address,
response_sender,
})
.await
.context("Command receiver should not be dropped.")?;
response_receiver
.await
.context("Sender not to be dropped.")?
}

pub async fn add_bootstrap_nodes(&self, nodes: Vec<(PeerId, Multiaddr)>) -> Result<()> {
for (peer, addr) in nodes {
self.dial_peer(peer, addr.clone())
.await
.context("Dialing Bootstrap peer failed.")?;
self.add_address(peer, addr.clone()).await?;
}

Ok(())
}

pub async fn bootstrap(&self) -> Result<()> {
// bootstrapping is impossible on an empty DHT table
// at least one node is required to be known, so check
Expand Down Expand Up @@ -103,6 +129,11 @@ pub enum Command {
addr: Multiaddr,
response_sender: oneshot::Sender<Result<()>>,
},
DialPeer {
peer_id: PeerId,
peer_address: Multiaddr,
response_sender: oneshot::Sender<Result<()>>,
},
AddAddress {
peer_id: PeerId,
multiaddr: Multiaddr,
Expand Down
56 changes: 44 additions & 12 deletions bootstrap/src/p2p/event_loop.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use anyhow::Result;
use anyhow::{anyhow, Result};
use libp2p::{
autonat::{self, InboundProbeEvent, OutboundProbeEvent},
futures::StreamExt,
Expand All @@ -8,7 +8,11 @@ use libp2p::{
swarm::SwarmEvent,
Multiaddr, PeerId, Swarm,
};
use std::{collections::HashMap, str::FromStr, time::Duration};
use std::{
collections::{hash_map, HashMap},
str::FromStr,
time::Duration,
};
use tokio::{
sync::{mpsc, oneshot},
time::{interval_at, Instant, Interval},
Expand All @@ -24,6 +28,7 @@ enum QueryChannel {
}

enum SwarmChannel {
Dial(oneshot::Sender<Result<()>>),
ConnectionEstablished(oneshot::Sender<(PeerId, Multiaddr)>),
}

Expand Down Expand Up @@ -161,7 +166,7 @@ impl EventLoop {
}

if protocols.contains(&self.swarm.behaviour_mut().kademlia.protocol_names()[0]) {
debug!("Adding peer {peer_id} to routing table.");
trace!("Adding peer {peer_id} to routing table.");
for addr in listen_addrs {
self.swarm
.behaviour_mut()
Expand Down Expand Up @@ -221,22 +226,31 @@ impl EventLoop {
error,
} => {
trace!("Outgoing connection error. Connection id: {connection_id}. Peer: {peer_id}. Error: {error}.");

if let Some(SwarmChannel::Dial(ch)) = self.pending_swarm_events.remove(&peer_id) {
_ = ch.send(Err(anyhow!(error)));
}
},
SwarmEvent::ConnectionEstablished {
endpoint, peer_id, ..
} => {
// while waiting for a first successful connection,
// we're interested in a case where we are dialing back
if endpoint.is_dialer() {
// check if there is a command waiting for a response
let local_peer_id = self.swarm.local_peer_id();
if let Some(SwarmChannel::ConnectionEstablished(ch)) =
self.pending_swarm_events.remove(local_peer_id)
{
// signal back that we have successfully established a connection,
// give us back PeerId and Multiaddress
let addr = endpoint.get_remote_address().to_owned();
_ = ch.send((peer_id, addr));
if let Some(event) = self.pending_swarm_events.remove(&peer_id) {
match event {
// check if there is a command waiting for a response for established 1st connection
SwarmChannel::ConnectionEstablished(ch) => {
// signal back that we have successfully established a connection,
// give us back PeerId and Multiaddress
let addr = endpoint.get_remote_address().to_owned();
_ = ch.send((peer_id, addr));
},
SwarmChannel::Dial(ch) => {
// signal back that dial was a success
_ = ch.send(Ok(()));
},
}
}
}
},
Expand All @@ -262,6 +276,24 @@ impl EventLoop {
Err(err) => response_sender.send(Err(err.into())),
}
},
Command::DialPeer {
peer_id,
peer_address,
response_sender,
} => {
if let hash_map::Entry::Vacant(e) = self.pending_swarm_events.entry(peer_id) {
match self.swarm.dial(peer_address.with(Protocol::P2p(peer_id))) {
Ok(()) => {
e.insert(SwarmChannel::Dial(response_sender));
},
Err(e) => {
let _ = response_sender.send(Err(anyhow!(e)));
},
}
} else {
todo!("Already dialing peer.");
}
},
Command::AddAddress {
peer_id,
multiaddr,
Expand Down
43 changes: 41 additions & 2 deletions bootstrap/src/types.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
use anyhow::Context;
use libp2p::StreamProtocol;
use anyhow::{anyhow, Context, Error};
use libp2p::{Multiaddr, PeerId, StreamProtocol};
use semver::Version;
use serde::{Deserialize, Serialize};
use std::{
Expand All @@ -24,6 +24,42 @@ pub enum SecretKey {
Key { key: String },
}

#[derive(Serialize, Deserialize, Debug, Clone)]
#[serde(try_from = "String")]
pub struct CompactMultiaddress((PeerId, Multiaddr));

impl TryFrom<String> for CompactMultiaddress {
type Error = Error;

fn try_from(value: String) -> std::result::Result<Self, Self::Error> {
let Some((_, peer_id)) = value.rsplit_once('/') else {
return Err(anyhow!("Invalid multiaddress string"));
};
let peer_id = PeerId::from_str(peer_id)?;
let multiaddr = Multiaddr::from_str(&value)?;
Ok(CompactMultiaddress((peer_id, multiaddr)))
}
}

#[derive(Serialize, Deserialize, Clone, Debug)]
#[serde(
untagged,
expecting = "Valid multiaddress/peer_id string or a tuple (peer_id, multiaddress) expected"
)]
pub enum MultiaddrConfig {
Compact(CompactMultiaddress),
PeerIdAndMultiaddr((PeerId, Multiaddr)),
}

impl From<&MultiaddrConfig> for (PeerId, Multiaddr) {
fn from(value: &MultiaddrConfig) -> Self {
match value {
MultiaddrConfig::Compact(CompactMultiaddress(value)) => value.clone(),
MultiaddrConfig::PeerIdAndMultiaddr(value) => value.clone(),
}
}
}

#[derive(Serialize, Deserialize, Debug, Clone)]
#[serde(default)]
pub struct RuntimeConfig {
Expand Down Expand Up @@ -52,6 +88,8 @@ pub struct RuntimeConfig {
pub autonat_only_global_ips: bool,
/// Sets the timeout for a single Kademlia query. (default: 60s).
pub kad_query_timeout: u32,
/// Vector of Light Client bootstrap nodes, used to bootstrap DHT. If not set, light client acts as a bootstrap node, waiting for first peer to connect for DHT bootstrap (default: empty).
pub bootstraps: Vec<MultiaddrConfig>,
/// Defines a period of time in which periodic bootstraps will be repeated. (default: 300s)
pub bootstrap_period: u64,
/// OpenTelemetry Collector endpoint (default: http://127.0.0.1:4317)
Expand Down Expand Up @@ -151,6 +189,7 @@ impl Default for RuntimeConfig {
autonat_only_global_ips: true,
connection_idle_timeout: 30,
kad_query_timeout: 60,
bootstraps: vec![],
bootstrap_period: 300,
ot_collector_endpoint: "http://127.0.0.1:4317".to_string(),
metrics_network_dump_interval: 15,
Expand Down

0 comments on commit cb57787

Please sign in to comment.