Skip to content

Commit

Permalink
p2p: Improve error handling during handshake:
Browse files Browse the repository at this point in the history
Introduce a new entry status, INCOMPATIBLE_ENTRY. Entries with this
status will not increase the failure attempts, instead, they will persist in
the routing table until replaced by a new peer. This feature is useful for
seeding and the lookup process.

Add a boolean value to the VerAck message to indicate whether the
version is accepted or not.
  • Loading branch information
hozan23 committed Nov 22, 2023
1 parent 542897c commit 34b0a91
Show file tree
Hide file tree
Showing 8 changed files with 111 additions and 45 deletions.
15 changes: 10 additions & 5 deletions p2p/src/connection.rs
Original file line number Diff line number Diff line change
@@ -1,10 +1,12 @@
use smol::{channel::Sender, lock::Mutex};
use std::{collections::VecDeque, fmt, sync::Arc};

use karyons_core::async_utils::CondVar;
use smol::{channel::Sender, lock::Mutex};

use karyons_core::async_utils::CondVar;
use karyons_net::Conn;

use crate::Result;

/// Defines the direction of a network connection.
#[derive(Clone, Debug)]
pub enum ConnDirection {
Expand All @@ -24,7 +26,7 @@ impl fmt::Display for ConnDirection {
pub struct NewConn {
pub direction: ConnDirection,
pub conn: Conn,
pub disconnect_signal: Sender<()>,
pub disconnect_signal: Sender<Result<()>>,
}

/// Connection queue
Expand All @@ -42,7 +44,7 @@ impl ConnQueue {
}

/// Push a connection into the queue and wait for the disconnect signal
pub async fn handle(&self, conn: Conn, direction: ConnDirection) {
pub async fn handle(&self, conn: Conn, direction: ConnDirection) -> Result<()> {
let (disconnect_signal, chan) = smol::channel::bounded(1);
let new_conn = NewConn {
direction,
Expand All @@ -51,7 +53,10 @@ impl ConnQueue {
};
self.queue.lock().await.push_back(new_conn);
self.conn_available.signal();
let _ = chan.recv().await;
if let Ok(result) = chan.recv().await {
return result;
}
Ok(())
}

/// Receive the next connection in the queue
Expand Down
38 changes: 30 additions & 8 deletions p2p/src/discovery/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,8 @@ use crate::{
listener::Listener,
monitor::Monitor,
routing_table::{
Entry, EntryStatusFlag, RoutingTable, CONNECTED_ENTRY, DISCONNECTED_ENTRY, PENDING_ENTRY,
UNREACHABLE_ENTRY, UNSTABLE_ENTRY,
Entry, EntryStatusFlag, RoutingTable, CONNECTED_ENTRY, DISCONNECTED_ENTRY,
INCOMPATIBLE_ENTRY, PENDING_ENTRY, UNREACHABLE_ENTRY, UNSTABLE_ENTRY,
},
slots::ConnectionSlots,
Error, PeerID, Result,
Expand Down Expand Up @@ -169,8 +169,8 @@ impl Discovery {
/// Start a listener and on success, return the resolved endpoint.
async fn start_listener(self: &Arc<Self>, endpoint: &Endpoint) -> Result<Endpoint> {
let selfc = self.clone();
let callback = |conn: Conn| async move {
selfc.conn_queue.handle(conn, ConnDirection::Inbound).await;
let callback = |c: Conn| async move {
selfc.conn_queue.handle(c, ConnDirection::Inbound).await?;
Ok(())
};

Expand Down Expand Up @@ -205,12 +205,34 @@ impl Discovery {
/// Connect to the given endpoint using the connector
async fn connect(self: &Arc<Self>, endpoint: &Endpoint, pid: Option<PeerID>) {
let selfc = self.clone();
let pid_cloned = pid.clone();
let pid_c = pid.clone();
let endpoint_c = endpoint.clone();
let cback = |conn: Conn| async move {
selfc.conn_queue.handle(conn, ConnDirection::Outbound).await;
if let Some(pid) = pid_cloned {
selfc.update_entry(&pid, DISCONNECTED_ENTRY).await;
let result = selfc.conn_queue.handle(conn, ConnDirection::Outbound).await;

// If the entry is not in the routing table, ignore the result
let pid = match pid_c {
Some(p) => p,
None => return Ok(()),
};

match result {
Err(Error::IncompatiblePeer) => {
error!("Failed to do handshake: {endpoint_c} incompatible peer");
selfc.update_entry(&pid, INCOMPATIBLE_ENTRY).await;
}
Err(Error::PeerAlreadyConnected) => {
// TODO
selfc.update_entry(&pid, DISCONNECTED_ENTRY).await;
}
Err(_) => {
selfc.update_entry(&pid, UNSTABLE_ENTRY).await;
}
Ok(_) => {
selfc.update_entry(&pid, DISCONNECTED_ENTRY).await;
}
}

Ok(())
};

Expand Down
2 changes: 1 addition & 1 deletion p2p/src/discovery/refresh.rs
Original file line number Diff line number Diff line change
Expand Up @@ -148,7 +148,7 @@ impl RefreshService {
for chunk in entries.chunks(16) {
let mut tasks = Vec::new();
for bucket_entry in chunk {
if bucket_entry.is_connected() {
if bucket_entry.is_connected() || bucket_entry.is_incompatible() {
continue;
}

Expand Down
3 changes: 3 additions & 0 deletions p2p/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,9 @@ pub enum Error {
#[error("Invalid message error: {0}")]
InvalidMsg(String),

#[error("Incompatible Peer")]
IncompatiblePeer,

#[error(transparent)]
ParseIntError(#[from] std::num::ParseIntError),

Expand Down
9 changes: 7 additions & 2 deletions p2p/src/message.rs
Original file line number Diff line number Diff line change
Expand Up @@ -62,9 +62,14 @@ pub struct VerMsg {
pub protocols: HashMap<ProtocolID, VersionInt>,
}

/// VerAck message acknowledging the receipt of a Version message.
/// VerAck message acknowledges the receipt of a Version message. The message
/// consists of the peer ID and an acknowledgment boolean value indicating
/// whether the version is accepted.
#[derive(Decode, Encode, Debug, Clone)]
pub struct VerAckMsg(pub PeerID);
pub struct VerAckMsg {
pub peer_id: PeerID,
pub ack: bool,
}

/// Shutdown message.
#[derive(Decode, Encode, Debug, Clone)]
Expand Down
56 changes: 35 additions & 21 deletions p2p/src/peer_pool.rs
Original file line number Diff line number Diff line change
Expand Up @@ -100,18 +100,15 @@ impl PeerPool {
pub async fn listen_loop(self: Arc<Self>) {
loop {
let new_conn = self.conn_queue.next().await;
let disconnect_signal = new_conn.disconnect_signal;
let signal = new_conn.disconnect_signal;

let result = self
.new_peer(
new_conn.conn,
&new_conn.direction,
disconnect_signal.clone(),
)
.new_peer(new_conn.conn, &new_conn.direction, signal.clone())
.await;

// Only send a disconnect signal if there is an error when adding a peer.
if result.is_err() {
let _ = disconnect_signal.send(()).await;
let _ = signal.send(result).await;
}
}
}
Expand Down Expand Up @@ -155,12 +152,12 @@ impl PeerPool {
self: &Arc<Self>,
conn: Conn,
conn_direction: &ConnDirection,
disconnect_signal: Sender<()>,
) -> Result<PeerID> {
disconnect_signal: Sender<Result<()>>,
) -> Result<()> {
let endpoint = conn.peer_endpoint()?;
let io_codec = IOCodec::new(conn);

// Do a handshake with a connection before creating a new peer.
// Do a handshake with the connection before creating a new peer.
let pid = self.do_handshake(&io_codec, conn_direction).await?;

// TODO: Consider restricting the subnet for inbound connections
Expand All @@ -184,11 +181,11 @@ impl PeerPool {
let selfc = self.clone();
let pid_c = pid.clone();
let on_disconnect = |result| async move {
if let TaskResult::Completed(_) = result {
if let TaskResult::Completed(result) = result {
if let Err(err) = selfc.remove_peer(&pid_c).await {
error!("Failed to remove peer {pid_c}: {err}");
}
let _ = disconnect_signal.send(()).await;
let _ = disconnect_signal.send(result).await;
}
};

Expand All @@ -200,7 +197,8 @@ impl PeerPool {
self.monitor
.notify(&PeerPoolEvent::NewPeer(pid.clone()).into())
.await;
Ok(pid)

Ok(())
}

/// Checks if the peer list contains a peer with the given peer id
Expand Down Expand Up @@ -244,10 +242,19 @@ impl PeerPool {
) -> Result<PeerID> {
match conn_direction {
ConnDirection::Inbound => {
let pid = self.wait_vermsg(io_codec).await?;
self.send_verack(io_codec).await?;
Ok(pid)
let result = self.wait_vermsg(io_codec).await;
match result {
Ok(_) => {
self.send_verack(io_codec, true).await?;
}
Err(Error::IncompatibleVersion(_)) | Err(Error::UnsupportedProtocol(_)) => {
self.send_verack(io_codec, false).await?;
}
_ => {}
}
result
}

ConnDirection::Outbound => {
self.send_vermsg(io_codec).await?;
self.wait_verack(io_codec).await
Expand Down Expand Up @@ -293,10 +300,13 @@ impl PeerPool {
}

/// Send a Verack message
async fn send_verack(&self, io_codec: &IOCodec) -> Result<()> {
let verack = VerAckMsg(self.id.clone());
async fn send_verack(&self, io_codec: &IOCodec, ack: bool) -> Result<()> {
let verack = VerAckMsg {
peer_id: self.id.clone(),
ack,
};

trace!("Send VerAckMsg");
trace!("Send VerAckMsg {:?}", verack);
io_codec.write(NetMsgCmd::Verack, &verack).await?;
Ok(())
}
Expand All @@ -311,8 +321,12 @@ impl PeerPool {
let payload = get_msg_payload!(Verack, msg);
let (verack, _) = decode::<VerAckMsg>(&payload)?;

trace!("Received VerAckMsg from: {}", verack.0);
Ok(verack.0)
if !verack.ack {
return Err(Error::IncompatiblePeer);
}

trace!("Received VerAckMsg from: {}", verack.peer_id);
Ok(verack.peer_id)
}

/// Check if the new connection has compatible protocols.
Expand Down
21 changes: 15 additions & 6 deletions p2p/src/routing_table/bucket.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,23 +6,28 @@ use rand::{rngs::OsRng, seq::SliceRandom};
pub type EntryStatusFlag = u16;

/// The entry is connected.
pub const CONNECTED_ENTRY: EntryStatusFlag = 0b00001;
pub const CONNECTED_ENTRY: EntryStatusFlag = 0b000001;

/// The entry is disconnected. This will increase the failure counter.
pub const DISCONNECTED_ENTRY: EntryStatusFlag = 0b00010;
pub const DISCONNECTED_ENTRY: EntryStatusFlag = 0b000010;

/// The entry is ready to reconnect, meaning it has either been added and
/// has no connection attempts, or it has been refreshed.
pub const PENDING_ENTRY: EntryStatusFlag = 0b00100;
pub const PENDING_ENTRY: EntryStatusFlag = 0b000100;

/// The entry is unreachable. This will increase the failure counter.
pub const UNREACHABLE_ENTRY: EntryStatusFlag = 0b01000;
pub const UNREACHABLE_ENTRY: EntryStatusFlag = 0b001000;

/// The entry is unstable. This will increase the failure counter.
pub const UNSTABLE_ENTRY: EntryStatusFlag = 0b10000;
pub const UNSTABLE_ENTRY: EntryStatusFlag = 0b010000;

/// The entry is incompatible. This entry will not contribute to an increase in
/// failure attempts, instead, it will persist in the routing table for the
/// lookup process and will only be removed in the presence of a new entry.
pub const INCOMPATIBLE_ENTRY: EntryStatusFlag = 0b100000;

#[allow(dead_code)]
pub const ALL_ENTRY: EntryStatusFlag = 0b11111;
pub const ALL_ENTRY: EntryStatusFlag = 0b111111;

/// A BucketEntry represents a peer in the routing table.
#[derive(Clone, Debug)]
Expand All @@ -38,6 +43,10 @@ impl BucketEntry {
self.status ^ CONNECTED_ENTRY == 0
}

pub fn is_incompatible(&self) -> bool {
self.status ^ INCOMPATIBLE_ENTRY == 0
}

pub fn is_unreachable(&self) -> bool {
self.status ^ UNREACHABLE_ENTRY == 0
}
Expand Down
12 changes: 10 additions & 2 deletions p2p/src/routing_table/mod.rs
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
mod bucket;
mod entry;
pub use bucket::{
Bucket, BucketEntry, EntryStatusFlag, CONNECTED_ENTRY, DISCONNECTED_ENTRY, PENDING_ENTRY,
UNREACHABLE_ENTRY, UNSTABLE_ENTRY,
Bucket, BucketEntry, EntryStatusFlag, CONNECTED_ENTRY, DISCONNECTED_ENTRY, INCOMPATIBLE_ENTRY,
PENDING_ENTRY, UNREACHABLE_ENTRY, UNSTABLE_ENTRY,
};
pub use entry::{xor_distance, Entry, Key};

Expand Down Expand Up @@ -82,6 +82,14 @@ impl RoutingTable {
return AddEntryResult::Added;
}

// Replace it with an incompatible entry if one exists.
let incompatible_entry = bucket.iter().find(|e| e.is_incompatible()).cloned();
if let Some(e) = incompatible_entry {
bucket.remove(&e.entry.key);
bucket.add(&entry);
return AddEntryResult::Added;
}

// If the bucket is full, the entry is ignored.
AddEntryResult::Ignored
}
Expand Down

0 comments on commit 34b0a91

Please sign in to comment.