Skip to content

Commit

Permalink
refactor: rename "merge" to "consolidate" and update documentation
Browse files Browse the repository at this point in the history
  • Loading branch information
morph-dev committed Feb 3, 2025
1 parent b810965 commit da5ae3e
Show file tree
Hide file tree
Showing 3 changed files with 31 additions and 15 deletions.
37 changes: 26 additions & 11 deletions src/peer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,10 +9,25 @@ pub trait ConnectionPeer: Debug + Clone + Send + Sync {
/// Returns peer's id
fn id(&self) -> Self::Id;

/// Merge the given object into `Self` whilst consuming it.
/// Consolidates two peers into one.
///
/// It's possible that we have two instances that represent the same peer (equal `peer_id`),
/// and we need to consolidate them into one. This can happen when [Peer]-s passed with
/// [UtpSocket::accept_with_cid](crate::socket::UtpSocket::accept_with_cid) or
/// [UtpSocket::connect_with_cid](crate::socket::UtpSocket::connect_with_cid), and returned by
/// [AsyncUdpSocket::recv_from](crate::udp::AsyncUdpSocket::recv_from) contain peers (not just
/// `peer_id`).
///
/// The structure implementing this trait can decide on the exact behavior. Some examples:
/// - If structure is simple (i.e. two peers are the same iff all fields are the same), return
/// either (see implementation for `SocketAddr`)
/// - If we can determine which peer is newer (e.g. using timestamp or version field), return
/// newer peer
/// - If structure behaves more like a key-value map whose values don't change over time,
/// merge key-value pairs from both instances into one
///
/// Should panic if ids are not matching.
fn merge(&mut self, other: Self);
fn consolidate(a: Self, b: Self) -> Self;
}

impl ConnectionPeer for SocketAddr {
Expand All @@ -22,8 +37,9 @@ impl ConnectionPeer for SocketAddr {
*self
}

fn merge(&mut self, other: Self) {
assert!(self == &other)
fn consolidate(a: Self, b: Self) -> Self {
assert!(a == b, "Consolidating non-equal peers");
a
}
}

Expand Down Expand Up @@ -61,20 +77,19 @@ impl<P: ConnectionPeer> Peer<P> {
self.peer.as_ref()
}

/// Merge the given peer into `Self` whilst consuming it.
/// Consolidates given peer into `Self` whilst consuming it.
///
/// See [ConnectionPeer::consolidate] for details.
///
/// Panics if ids are not matching.
pub fn merge(&mut self, other: Self) {
assert!(self.id == other.id);
pub fn consolidate(&mut self, other: Self) {
assert!(self.id == other.id, "Consolidating with non-equal peer");
let Some(other_peer) = other.peer else {
return;
};

self.peer = match self.peer.take() {
Some(mut peer) => {
peer.merge(other_peer);
Some(peer)
}
Some(peer) => Some(P::consolidate(peer, other_peer)),
None => Some(other_peer),
};
}
Expand Down
4 changes: 2 additions & 2 deletions src/socket.rs
Original file line number Diff line number Diff line change
Expand Up @@ -121,7 +121,7 @@ where
// create a new stream for that connection. Otherwise, add the
// connection to the incoming connections.
if let Some(accept_with_cid) = awaiting.remove(&cid) {
peer.merge(accept_with_cid.peer);
peer.consolidate(accept_with_cid.peer);

let (connected_tx, connected_rx) = oneshot::channel();
let (events_tx, events_rx) = mpsc::unbounded_channel();
Expand Down Expand Up @@ -177,7 +177,7 @@ where
awaiting.insert(accept_with_cid.cid.clone(), accept_with_cid);
continue;
};
peer.merge(accept_with_cid.peer);
peer.consolidate(accept_with_cid.peer);
Self::select_accept_helper(accept_with_cid.cid, peer, syn, conns.clone(), accept_with_cid.accept, socket_event_tx.clone());
}
Some(accept) = accepts_rx.recv(), if !incoming_conns.is_empty() => {
Expand Down
5 changes: 3 additions & 2 deletions src/testutils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -81,8 +81,9 @@ impl ConnectionPeer for char {
*self
}

fn merge(&mut self, other: Self) {
assert!(*self == other)
fn consolidate(a: Self, b: Self) -> Self {
assert!(a == b, "Consolidating non-equal peers");
a
}
}

Expand Down

0 comments on commit da5ae3e

Please sign in to comment.