Skip to content

Commit

Permalink
feat!: distinguish peer and peer_id on api level (#136)
Browse files Browse the repository at this point in the history
  • Loading branch information
morph-dev authored Feb 4, 2025
1 parent ef62dd9 commit a4da863
Show file tree
Hide file tree
Showing 12 changed files with 275 additions and 129 deletions.
3 changes: 1 addition & 2 deletions .circleci/config.yml
Original file line number Diff line number Diff line change
Expand Up @@ -7,5 +7,4 @@ workflows:
- rust/lint-test-build:
clippy_arguments: '--all-targets --all-features -- --deny warnings'
release: true
version: 1.71.1

version: 1.81.0
11 changes: 1 addition & 10 deletions src/cid.rs
Original file line number Diff line number Diff line change
@@ -1,15 +1,6 @@
use std::fmt::Debug;
use std::hash::Hash;
use std::net::SocketAddr;

/// A remote peer.
pub trait ConnectionPeer: Clone + Debug + Eq + Hash + PartialEq + Send + Sync {}

impl ConnectionPeer for SocketAddr {}

#[derive(Clone, Copy, Debug, Hash, Eq, PartialEq)]
pub struct ConnectionId<P> {
pub send: u16,
pub recv: u16,
pub peer: P,
pub peer_id: P,
}
40 changes: 23 additions & 17 deletions src/conn.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,10 +8,12 @@ use delay_map::HashMapDelay;
use futures::StreamExt;
use tokio::sync::{mpsc, oneshot, Notify};

use crate::cid::{ConnectionId, ConnectionPeer};
use crate::cid::ConnectionId;
use crate::congestion;
use crate::event::{SocketEvent, StreamEvent};
use crate::packet::{Packet, PacketBuilder, PacketType, SelectiveAck};
use crate::peer::ConnectionPeer;
use crate::peer::Peer;
use crate::recv::ReceiveBuffer;
use crate::send::SendBuffer;
use crate::sent::{SentPackets, SentPacketsError};
Expand Down Expand Up @@ -167,9 +169,10 @@ impl From<ConnectionConfig> for congestion::Config {
}
}

pub struct Connection<const N: usize, P> {
pub struct Connection<const N: usize, P: ConnectionPeer> {
state: State<N>,
cid: ConnectionId<P>,
cid: ConnectionId<P::Id>,
peer: Peer<P>,
config: ConnectionConfig,
endpoint: Endpoint,
peer_ts_diff: Duration,
Expand All @@ -185,7 +188,8 @@ pub struct Connection<const N: usize, P> {

impl<const N: usize, P: ConnectionPeer> Connection<N, P> {
pub fn new(
cid: ConnectionId<P>,
cid: ConnectionId<P::Id>,
peer: Peer<P>,
config: ConnectionConfig,
syn: Option<Packet>,
connected: oneshot::Sender<io::Result<()>>,
Expand All @@ -212,6 +216,7 @@ impl<const N: usize, P: ConnectionPeer> Connection<N, P> {
Self {
state: State::Connecting(Some(connected)),
cid,
peer,
config,
endpoint,
peer_ts_diff,
Expand All @@ -232,15 +237,15 @@ impl<const N: usize, P: ConnectionPeer> Connection<N, P> {
mut writes: mpsc::UnboundedReceiver<Write>,
mut shutdown: oneshot::Receiver<()>,
) -> io::Result<()> {
tracing::debug!("uTP conn starting... {:?}", self.cid.peer);
tracing::debug!("uTP conn starting... {:?}", self.peer);

// If we are the initiating endpoint, then send the SYN. If we are the accepting endpoint,
// then send the SYN-ACK.
match self.endpoint {
Endpoint::Initiator((syn_seq_num, ..)) => {
let syn = self.syn_packet(syn_seq_num);
self.socket_events
.send(SocketEvent::Outgoing((syn.clone(), self.cid.peer.clone())))
.send(SocketEvent::Outgoing((syn.clone(), self.peer.clone())))
.unwrap();
self.unacked
.insert_at(syn_seq_num, syn, self.config.initial_timeout);
Expand All @@ -250,7 +255,7 @@ impl<const N: usize, P: ConnectionPeer> Connection<N, P> {
Endpoint::Acceptor((syn, syn_ack)) => {
let state = self.state_packet().unwrap();
self.socket_events
.send(SocketEvent::Outgoing((state, self.cid.peer.clone())))
.send(SocketEvent::Outgoing((state, self.peer.clone())))
.unwrap();

let recv_buf = ReceiveBuffer::new(syn);
Expand Down Expand Up @@ -409,7 +414,7 @@ impl<const N: usize, P: ConnectionPeer> Connection<N, P> {
&mut self.unacked,
&mut self.socket_events,
fin,
&self.cid.peer,
&self.peer,
Instant::now(),
);
}
Expand Down Expand Up @@ -441,7 +446,7 @@ impl<const N: usize, P: ConnectionPeer> Connection<N, P> {
&mut self.unacked,
&mut self.socket_events,
fin,
&self.cid.peer,
&self.peer,
Instant::now(),
);
}
Expand Down Expand Up @@ -542,7 +547,7 @@ impl<const N: usize, P: ConnectionPeer> Connection<N, P> {
&mut self.unacked,
&mut self.socket_events,
packet,
&self.cid.peer,
&self.peer,
now,
);
seq_num = seq_num.wrapping_add(1);
Expand Down Expand Up @@ -680,7 +685,7 @@ impl<const N: usize, P: ConnectionPeer> Connection<N, P> {
let packet = self.syn_packet(seq);
let _ = self
.socket_events
.send(SocketEvent::Outgoing((packet, self.cid.peer.clone())));
.send(SocketEvent::Outgoing((packet, self.peer.clone())));
}
}
Endpoint::Acceptor(..) => {}
Expand Down Expand Up @@ -728,7 +733,7 @@ impl<const N: usize, P: ConnectionPeer> Connection<N, P> {
&mut self.unacked,
&mut self.socket_events,
packet,
&self.cid.peer,
&self.peer,
now,
);
}
Expand Down Expand Up @@ -784,7 +789,7 @@ impl<const N: usize, P: ConnectionPeer> Connection<N, P> {
match packet.packet_type() {
PacketType::Syn | PacketType::Fin | PacketType::Data => {
if let Some(state) = self.state_packet() {
let event = SocketEvent::Outgoing((state, self.cid.peer.clone()));
let event = SocketEvent::Outgoing((state, self.peer.clone()));
if self.socket_events.send(event).is_err() {
tracing::warn!("Cannot transmit state packet: socket closed channel");
return;
Expand Down Expand Up @@ -1156,7 +1161,7 @@ impl<const N: usize, P: ConnectionPeer> Connection<N, P> {
&mut self.unacked,
&mut self.socket_events,
packet,
&self.cid.peer,
&self.peer,
now,
);
}
Expand All @@ -1167,7 +1172,7 @@ impl<const N: usize, P: ConnectionPeer> Connection<N, P> {
unacked: &mut HashMapDelay<u16, Packet>,
socket_events: &mut mpsc::UnboundedSender<SocketEvent<P>>,
packet: Packet,
dest: &P,
peer: &Peer<P>,
now: Instant,
) {
let (payload, len) = if packet.payload().is_empty() {
Expand All @@ -1189,7 +1194,7 @@ impl<const N: usize, P: ConnectionPeer> Connection<N, P> {

sent_packets.on_transmit(packet.seq_num(), packet.packet_type(), payload, len, now);
unacked.insert_at(packet.seq_num(), packet.clone(), sent_packets.timeout());
let outbound = SocketEvent::Outgoing((packet, dest.clone()));
let outbound = SocketEvent::Outgoing((packet, peer.clone()));
if socket_events.send(outbound).is_err() {
tracing::warn!("Cannot transmit packet: socket closed channel");
}
Expand All @@ -1214,12 +1219,13 @@ mod test {
let cid = ConnectionId {
send: 101,
recv: 100,
peer,
peer_id: peer,
};

Connection {
state: State::Connecting(Some(connected)),
cid,
peer: Peer::new(peer),
config: ConnectionConfig::default(),
endpoint,
peer_ts_diff: Duration::from_millis(100),
Expand Down
7 changes: 4 additions & 3 deletions src/event.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
use crate::cid::ConnectionId;
use crate::packet::Packet;
use crate::peer::{ConnectionPeer, Peer};

#[derive(Clone, Debug)]
pub enum StreamEvent {
Expand All @@ -8,7 +9,7 @@ pub enum StreamEvent {
}

#[derive(Clone, Debug)]
pub enum SocketEvent<P> {
Outgoing((Packet, P)),
Shutdown(ConnectionId<P>),
pub enum SocketEvent<P: ConnectionPeer> {
Outgoing((Packet, Peer<P>)),
Shutdown(ConnectionId<P::Id>),
}
1 change: 1 addition & 0 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ pub mod congestion;
pub mod conn;
pub mod event;
pub mod packet;
pub mod peer;
pub mod recv;
pub mod send;
pub mod sent;
Expand Down
96 changes: 96 additions & 0 deletions src/peer.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,96 @@
use std::fmt::Debug;
use std::hash::Hash;
use std::net::SocketAddr;

/// A trait that describes remote peer
pub trait ConnectionPeer: Debug + Clone + Send + Sync {
type Id: Debug + Clone + PartialEq + Eq + Hash + Send + Sync;

/// Returns peer's id
fn id(&self) -> Self::Id;

/// Consolidates two peers into one.
///
/// It's possible that we have two instances that represent the same peer (equal `peer_id`),
/// and we need to consolidate them into one. This can happen when [Peer]-s passed with
/// [UtpSocket::accept_with_cid](crate::socket::UtpSocket::accept_with_cid) or
/// [UtpSocket::connect_with_cid](crate::socket::UtpSocket::connect_with_cid), and returned by
/// [AsyncUdpSocket::recv_from](crate::udp::AsyncUdpSocket::recv_from) contain peers (not just
/// `peer_id`).
///
/// The structure implementing this trait can decide on the exact behavior. Some examples:
/// - If structure is simple (i.e. two peers are the same iff all fields are the same), return
/// either (see implementation for `SocketAddr`)
/// - If we can determine which peer is newer (e.g. using timestamp or version field), return
/// newer peer
/// - If structure behaves more like a key-value map whose values don't change over time,
/// merge key-value pairs from both instances into one
///
/// Should panic if ids are not matching.
fn consolidate(a: Self, b: Self) -> Self;
}

impl ConnectionPeer for SocketAddr {
type Id = Self;

fn id(&self) -> Self::Id {
*self
}

fn consolidate(a: Self, b: Self) -> Self {
assert!(a == b, "Consolidating non-equal peers");
a
}
}

/// Structure that stores peer's id, and maybe peer as well.
#[derive(Debug, Clone)]
pub struct Peer<P: ConnectionPeer> {
id: P::Id,
peer: Option<P>,
}

impl<P: ConnectionPeer> Peer<P> {
/// Creates new instance that stores peer
pub fn new(peer: P) -> Self {
Self {
id: peer.id(),
peer: Some(peer),
}
}

/// Creates new instance that only stores peer's id
pub fn new_id(peer_id: P::Id) -> Self {
Self {
id: peer_id,
peer: None,
}
}

/// Returns peer's id
pub fn id(&self) -> &P::Id {
&self.id
}

/// Returns optional reference to peer
pub fn peer(&self) -> Option<&P> {
self.peer.as_ref()
}

/// Consolidates given peer into `Self` whilst consuming it.
///
/// See [ConnectionPeer::consolidate] for details.
///
/// Panics if ids are not matching.
pub fn consolidate(&mut self, other: Self) {
assert!(self.id == other.id, "Consolidating with non-equal peer");
let Some(other_peer) = other.peer else {
return;
};

self.peer = match self.peer.take() {
Some(peer) => Some(P::consolidate(peer, other_peer)),
None => Some(other_peer),
};
}
}
Loading

0 comments on commit a4da863

Please sign in to comment.