From da5ae3e46a3daa6489a98fd2c7d198e83eee86b4 Mon Sep 17 00:00:00 2001 From: Milos Stankovic <82043364+morph-dev@users.noreply.github.com> Date: Mon, 3 Feb 2025 20:10:51 +0200 Subject: [PATCH] refactor: rename "merge" to "consolidate" and update documentation --- src/peer.rs | 37 ++++++++++++++++++++++++++----------- src/socket.rs | 4 ++-- src/testutils.rs | 5 +++-- 3 files changed, 31 insertions(+), 15 deletions(-) diff --git a/src/peer.rs b/src/peer.rs index 0b0ac16..f668276 100644 --- a/src/peer.rs +++ b/src/peer.rs @@ -9,10 +9,25 @@ pub trait ConnectionPeer: Debug + Clone + Send + Sync { /// Returns peer's id fn id(&self) -> Self::Id; - /// Merge the given object into `Self` whilst consuming it. + /// Consolidates two peers into one. + /// + /// It's possible that we have two instances that represent the same peer (equal `peer_id`), + /// and we need to consolidate them into one. This can happen when [Peer]-s passed with + /// [UtpSocket::accept_with_cid](crate::socket::UtpSocket::accept_with_cid) or + /// [UtpSocket::connect_with_cid](crate::socket::UtpSocket::connect_with_cid), and returned by + /// [AsyncUdpSocket::recv_from](crate::udp::AsyncUdpSocket::recv_from) contain peers (not just + /// `peer_id`). + /// + /// The structure implementing this trait can decide on the exact behavior. Some examples: + /// - If structure is simple (i.e. two peers are the same iff all fields are the same), return + /// either (see implementation for `SocketAddr`) + /// - If we can determine which peer is newer (e.g. using timestamp or version field), return + /// newer peer + /// - If structure behaves more like a key-value map whose values don't change over time, + /// merge key-value pairs from both instances into one /// /// Should panic if ids are not matching. - fn merge(&mut self, other: Self); + fn consolidate(a: Self, b: Self) -> Self; } impl ConnectionPeer for SocketAddr { @@ -22,8 +37,9 @@ impl ConnectionPeer for SocketAddr { *self } - fn merge(&mut self, other: Self) { - assert!(self == &other) + fn consolidate(a: Self, b: Self) -> Self { + assert!(a == b, "Consolidating non-equal peers"); + a } } @@ -61,20 +77,19 @@ impl Peer

{ self.peer.as_ref() } - /// Merge the given peer into `Self` whilst consuming it. + /// Consolidates given peer into `Self` whilst consuming it. + /// + /// See [ConnectionPeer::consolidate] for details. /// /// Panics if ids are not matching. - pub fn merge(&mut self, other: Self) { - assert!(self.id == other.id); + pub fn consolidate(&mut self, other: Self) { + assert!(self.id == other.id, "Consolidating with non-equal peer"); let Some(other_peer) = other.peer else { return; }; self.peer = match self.peer.take() { - Some(mut peer) => { - peer.merge(other_peer); - Some(peer) - } + Some(peer) => Some(P::consolidate(peer, other_peer)), None => Some(other_peer), }; } diff --git a/src/socket.rs b/src/socket.rs index d219604..ca5dd1b 100644 --- a/src/socket.rs +++ b/src/socket.rs @@ -121,7 +121,7 @@ where // create a new stream for that connection. Otherwise, add the // connection to the incoming connections. if let Some(accept_with_cid) = awaiting.remove(&cid) { - peer.merge(accept_with_cid.peer); + peer.consolidate(accept_with_cid.peer); let (connected_tx, connected_rx) = oneshot::channel(); let (events_tx, events_rx) = mpsc::unbounded_channel(); @@ -177,7 +177,7 @@ where awaiting.insert(accept_with_cid.cid.clone(), accept_with_cid); continue; }; - peer.merge(accept_with_cid.peer); + peer.consolidate(accept_with_cid.peer); Self::select_accept_helper(accept_with_cid.cid, peer, syn, conns.clone(), accept_with_cid.accept, socket_event_tx.clone()); } Some(accept) = accepts_rx.recv(), if !incoming_conns.is_empty() => { diff --git a/src/testutils.rs b/src/testutils.rs index 84cb3fd..2ea0f00 100644 --- a/src/testutils.rs +++ b/src/testutils.rs @@ -81,8 +81,9 @@ impl ConnectionPeer for char { *self } - fn merge(&mut self, other: Self) { - assert!(*self == other) + fn consolidate(a: Self, b: Self) -> Self { + assert!(a == b, "Consolidating non-equal peers"); + a } }