Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Async I/O based Laminar implementation: Initial Pass. #291

Draft
wants to merge 2 commits into
base: master
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 10 additions & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -20,9 +20,15 @@ autobenches = false
edition = "2018"

[dependencies]
async-channel = "1.6"
async-net = "1.6.0"
futures-timer = "3.0"
bevy_tasks = "0.5"
tracing = "0.1"
byteorder = "1.4.3"
crc = "1.8"
crossbeam-channel = "0.5"
dashmap = "4.0"
lazy_static = "1.4"
log = "0.4"
rand = "0.8"
Expand All @@ -38,6 +44,9 @@ serde = "1.0"
serde_derive = "1.0"
quickcheck = "1.0"
quickcheck_macros = "1.0"
static_assertions = "1.1"
futures = "0.3"
serial_test = "0.5"

[features]
tester = [
Expand All @@ -52,4 +61,4 @@ harness = false

[[bin]]
name = "laminar-tester"
required-features = ["tester"]
required-features = ["tester"]
1 change: 1 addition & 0 deletions src/net.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,4 +18,5 @@ mod quality;
mod socket;
mod virtual_connection;

pub mod aio;
pub mod constants;
224 changes: 224 additions & 0 deletions src/net/aio/channel.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,224 @@
use async_channel::{TryRecvError, TrySendError};

#[derive(Clone)]
pub struct BidirectionalAsyncChannel<T> {
incoming: async_channel::Receiver<T>,
outgoing: async_channel::Sender<T>,
}

impl<T> BidirectionalAsyncChannel<T> {
/// Creates a pair of connected Peers without limitations on how many messages can be
/// buffered.
pub fn create_unbounded_pair() -> (Self, Self) {
Self::create_pair(async_channel::unbounded(), async_channel::unbounded())
}

/// Creates a pair of connected Peers with a limited capacity for many messages can be
/// buffered in either direction.
pub fn create_bounded_pair(capacity: usize) -> (Self, Self) {
Self::create_pair(
async_channel::bounded(capacity),
async_channel::bounded(capacity),
)
}

/// Sends a message to the connected peer.
///
/// If the send buffer is full, this method waits until there is space for a message.
///
/// If the peer is disconnected, this method returns an error.
#[inline]
pub fn send(&self, message: T) -> async_channel::Send<'_, T> {
self.outgoing.send(message)
}

/// Receives a message from the connected peer.
///
/// If there is no pending messages, this method waits until there is a message.
///
/// If the peer is disconnected, this method receives a message or returns an error if there
/// are no more messages.
#[inline]
pub fn recv(&self) -> async_channel::Recv<'_, T> {
self.incoming.recv()
}

/// Attempts to send a message to the connected peer.
#[inline]
pub fn try_send(&self, message: T) -> Result<(), TrySendError<T>> {
self.outgoing.try_send(message)
}

/// Attempts to receive a message from the connected peer.
#[inline]
pub fn try_recv(&self) -> Result<T, TryRecvError> {
self.incoming.try_recv()
}

/// Returns true if the associated peer is still connected.
pub fn is_connected(&self) -> bool {
!self.incoming.is_closed() && !self.outgoing.is_closed()
}

/// Disconnects the paired Peers from either end. Any future attempts to send messages in
/// either direction will fail, but any messages not yet recieved.
///
/// If the Peer, or it's constituent channels were cloned, all of the cloned instances will
/// appear disconnected.
pub fn disconnect(&self) {
self.outgoing.close();
self.incoming.close();
}

/// Gets the raw sender for the peer.
pub fn sender(&self) -> async_channel::Sender<T> {
self.outgoing.clone()
}

/// Gets the raw reciever for the peer.
pub fn reciever(&self) -> async_channel::Receiver<T> {
self.incoming.clone()
}

/// The number of messages that are currently buffered in the send queue. Returns 0 if the
/// channel is closed.
pub fn pending_send_count(&self) -> usize {
self.outgoing.len()
}

/// The number of messages that are currently buffered in the recieve queue. Returns 0 if the
/// channel is closed.
pub fn pending_recv_count(&self) -> usize {
self.incoming.len()
}

fn create_pair(
a: (async_channel::Sender<T>, async_channel::Receiver<T>),
b: (async_channel::Sender<T>, async_channel::Receiver<T>),
) -> (Self, Self) {
let (a_send, a_recv) = a;
let (b_send, b_recv) = b;
let a = Self {
incoming: a_recv,
outgoing: b_send,
};
let b = Self {
incoming: b_recv,
outgoing: a_send,
};
(a, b)
}
}

#[cfg(test)]
mod test {
use super::*;

static_assertions::assert_impl_all!(BidirectionalAsyncChannel<i32>: Clone);

#[test]
pub fn send_works_both_ways() {
let (a, b) = BidirectionalAsyncChannel::<i32>::create_unbounded_pair();

assert!(a.try_send(1).is_ok());
assert!(b.try_send(4).is_ok());
assert!(a.try_send(2).is_ok());
assert!(b.try_send(5).is_ok());
assert!(a.try_send(3).is_ok());
assert!(b.try_send(6).is_ok());

assert_eq!(a.pending_send_count(), 3);
assert_eq!(b.pending_send_count(), 3);
assert_eq!(a.pending_recv_count(), 3);
assert_eq!(b.pending_recv_count(), 3);

assert_eq!(a.try_recv(), Ok(4));
assert_eq!(a.try_recv(), Ok(5));
assert_eq!(a.try_recv(), Ok(6));

assert_eq!(b.try_recv(), Ok(1));
assert_eq!(b.try_recv(), Ok(2));
assert_eq!(b.try_recv(), Ok(3));
}

#[test]
pub fn bounded_pairs_error_on_being_full() {
let (a, b) = BidirectionalAsyncChannel::<i32>::create_bounded_pair(2);

assert!(a.try_send(1).is_ok());
assert!(a.try_send(2).is_ok());
assert!(matches!(a.try_send(3), Err(TrySendError::Full(3))));
assert!(b.try_send(4).is_ok());
assert!(b.try_send(5).is_ok());
assert!(matches!(b.try_send(6), Err(TrySendError::Full(6))));

assert_eq!(a.try_recv(), Ok(4));
assert_eq!(a.try_recv(), Ok(5));
assert_eq!(a.try_recv(), Err(TryRecvError::Empty));

assert_eq!(b.try_recv(), Ok(1));
assert_eq!(b.try_recv(), Ok(2));
assert_eq!(a.try_recv(), Err(TryRecvError::Empty));
}

#[test]
pub fn disconnecting_closes_both_sides() {
let (a, b) = BidirectionalAsyncChannel::<i32>::create_bounded_pair(2);

a.disconnect();
assert!(!a.is_connected());
assert!(!b.is_connected());

let (a, b) = BidirectionalAsyncChannel::<i32>::create_bounded_pair(2);

b.disconnect();
assert!(!a.is_connected());
assert!(!b.is_connected());
}

#[test]
pub fn disconnecting_stop_any_future_sends() {
let (a, b) = BidirectionalAsyncChannel::<i32>::create_bounded_pair(2);

a.disconnect();
assert!(!a.is_connected());
assert!(!b.is_connected());

assert!(matches!(a.try_send(1), Err(TrySendError::Closed(1))));
assert!(matches!(b.try_send(1), Err(TrySendError::Closed(1))));
assert!(matches!(a.try_recv(), Err(TryRecvError::Closed)));
assert!(matches!(b.try_recv(), Err(TryRecvError::Closed)));
}

#[test]
pub fn disconnecting_allows_existing_items_to_be_flushed() {
let (a, b) = BidirectionalAsyncChannel::<i32>::create_unbounded_pair();

assert!(a.try_send(1).is_ok());
assert!(a.try_send(2).is_ok());
a.disconnect();
assert!(matches!(a.try_send(3), Err(TrySendError::Closed(3))));

assert_eq!(b.try_recv(), Ok(1));
assert_eq!(b.try_recv(), Ok(2));
assert_eq!(b.try_recv(), Err(TryRecvError::Closed));
}

#[test]
pub fn dropping_leads_to_disconnect() {
let (a, b) = BidirectionalAsyncChannel::<i32>::create_unbounded_pair();

assert!(a.is_connected());
drop(b);
assert!(!a.is_connected());

let (a, b) = BidirectionalAsyncChannel::<i32>::create_unbounded_pair();
let c = b.clone();

assert!(a.is_connected());
drop(b);
assert!(a.is_connected());
drop(c);
assert!(!a.is_connected());
}
}
21 changes: 21 additions & 0 deletions src/net/aio/mod.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
mod channel;
mod peer;
mod peers;

pub mod udp;
pub use channel::*;
pub use peer::*;
pub use peers::*;

/// Forwards all messages from one reciever to a sender until either the sender or reciever are
/// closed.
pub async fn forward<T>(
input: async_channel::Receiver<T>,
output: async_channel::Sender<T>,
) {
while let Ok(message) = input.recv().await {
if let Err(_) = output.send(message).await {
break;
}
}
}
84 changes: 84 additions & 0 deletions src/net/aio/peer.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,84 @@
use crate::net::LinkConditioner;
use super::channel::BidirectionalAsyncChannel;
use futures_timer::Delay;
use bevy_tasks::TaskPool;
use std::fmt;
use std::ops::Deref;

/// A bidirectional channel for binary messages.
#[derive(Clone)]
pub struct Peer(BidirectionalAsyncChannel<Box<[u8]>>);

impl Peer {
/// Creates a pair of connected Peers without limitations on how many messages can be
/// buffered.
pub fn create_unbounded_pair() -> (Self, Self) {
let (a, b) = BidirectionalAsyncChannel::create_unbounded_pair();
(Self(a), Self(b))
}

/// Creates a pair of connected Peers with a limited capacity for many messages can be
/// buffered in either direction.
pub fn create_bounded_pair(capacity: usize) -> (Self, Self) {
let (a, b) = BidirectionalAsyncChannel::create_bounded_pair(capacity);
(Self(a), Self(b))
}

/// Converts the peer into a conditioned one. All outgoing sends will be randomly dropped
/// and have additional latency added based on the provided LinkConditioner.
///
/// Useful for locally testing high latency or packet loss conditions.
///
/// It is strongly advised not to use this in a release build as it might introduce
/// unnecessary packet loss and latency.
pub fn with_link_conditioner(self, pool: &TaskPool, conditioner: LinkConditioner) -> Self {
let (a, b) = Self::create_unbounded_pair();
pool.spawn(Self::conditioned_send(
pool.clone(), b.reciever(), conditioner, self.sender())).detach();
pool.spawn(super::forward(self.reciever(), b.sender())).detach();
a
}

async fn conditioned_send(
pool: TaskPool,
input: async_channel::Receiver<Box<[u8]>>,
mut conditioner: LinkConditioner,
output: async_channel::Sender<Box<[u8]>>
) {
while let Ok(message) = input.recv().await {
if !conditioner.should_send() {
continue;
}

if output.is_closed() {
break;
}

let latency = conditioner.sample_latency();
let output = output.clone();
pool.spawn(async move {
Delay::new(latency).await;
output.send(message).await;
});
}
}
}

impl fmt::Debug for Peer {
fn fmt(&self, f: &mut fmt::Formatter) -> Result<(), fmt::Error> {
write!(f, "Peer {{ connected: {} }}", self.is_connected())
}
}

impl Deref for Peer {
type Target = BidirectionalAsyncChannel<Box<[u8]>>;
fn deref(&self) -> &Self::Target {
&self.0
}
}

#[cfg(test)]
mod test {
use super::*;
static_assertions::assert_impl_all!(Peer: Deref, Clone, Send, Sync);
}
Loading