Skip to content

Commit

Permalink
rch::bin: support forwarding
Browse files Browse the repository at this point in the history
Support automatic forwarding of binary channels.
  • Loading branch information
surban committed Apr 2, 2024
1 parent 7f6c561 commit 8b78440
Show file tree
Hide file tree
Showing 6 changed files with 315 additions and 52 deletions.
25 changes: 19 additions & 6 deletions remoc/src/rch/bin/mod.rs
Original file line number Diff line number Diff line change
@@ -1,8 +1,9 @@
//! A channel that exchanges binary data with a remote endpoint.
//!
//! Allow low-overhead exchange of binary data.
//! One end of the channel must be local while the other end must be remote.
//! Forwarding is not supported.
//! Allows low-overhead exchange of binary data.
//!
//! At least one end of the channel must be remote.
//! Forwarding, i.e. both channel ends on remote endpoints, is supported.
//!
//! If the sole use is to transfer a large binary object into one direction,
//! consider using a [lazy blob](crate::robj::lazy_blob) instead.
Expand All @@ -25,9 +26,21 @@ use super::interlock::{Interlock, Location};
pub fn channel() -> (Sender, Receiver) {
let (sender_tx, sender_rx) = tokio::sync::mpsc::unbounded_channel();
let (receiver_tx, receiver_rx) = tokio::sync::mpsc::unbounded_channel();
let interlock = Arc::new(Mutex::new(Interlock { sender: Location::Local, receiver: Location::Local }));
let interlock = Arc::new(Mutex::new(Interlock::new()));

let sender = Sender { sender: None, sender_rx, receiver_tx: Some(receiver_tx), interlock: interlock.clone() };
let receiver = Receiver { receiver: None, sender_tx: Some(sender_tx), receiver_rx, interlock };
let sender = Sender {
sender: None,
sender_rx,
receiver_tx: Some(receiver_tx),
interlock: interlock.clone(),
successor_tx: std::sync::Mutex::new(None),
};
let receiver = Receiver {
receiver: None,
sender_tx: Some(sender_tx),
receiver_rx,
interlock,
successor_tx: std::sync::Mutex::new(None),
};
(sender, receiver)
}
86 changes: 64 additions & 22 deletions remoc/src/rch/bin/receiver.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
use futures::FutureExt;
use serde::{ser, Deserialize, Serialize};
use serde::{Deserialize, Serialize};
use std::{
fmt,
fmt, mem,
sync::{Arc, Mutex},
};

Expand All @@ -20,6 +20,7 @@ pub struct Receiver {
pub(super) sender_tx: Option<tokio::sync::mpsc::UnboundedSender<Result<chmux::Sender, ConnectError>>>,
pub(super) receiver_rx: tokio::sync::mpsc::UnboundedReceiver<Result<chmux::Receiver, ConnectError>>,
pub(super) interlock: Arc<Mutex<Interlock>>,
pub(super) successor_tx: std::sync::Mutex<Option<tokio::sync::oneshot::Sender<Self>>>,
}

impl fmt::Debug for Receiver {
Expand Down Expand Up @@ -53,7 +54,17 @@ impl Receiver {
/// to the remote endpoint.
pub async fn into_inner(mut self) -> Result<chmux::Receiver, ConnectError> {
self.connect().await;
self.receiver.unwrap()
self.receiver.take().unwrap()
}

/// Forward data.
async fn forward(successor_rx: tokio::sync::oneshot::Receiver<Self>, tx: super::Sender) {
let Ok(rx) = successor_rx.await else { return };
let Ok(mut rx) = rx.into_inner().await else { return };
let Ok(mut tx) = tx.into_inner().await else { return };
if let Err(err) = chmux::forward(&mut rx, &mut tx).await {
tracing::debug!("forwarding binary channel failed: {err}");
}
}
}

Expand All @@ -63,34 +74,48 @@ impl Serialize for Receiver {
where
S: serde::Serializer,
{
let sender_tx =
self.sender_tx.clone().ok_or_else(|| ser::Error::custom("cannot forward received receiver"))?;

let sender_tx = self.sender_tx.clone();
let interlock_confirm = {
let mut interlock = self.interlock.lock().unwrap();
if !interlock.sender.check_local() {
return Err(ser::Error::custom("cannot send receiver because sender has been sent"));
if interlock.sender.check_local() {
Some(interlock.sender.start_send())
} else {
None
}
interlock.sender.start_send()
};

let port = PortSerializer::connect(|connect| {
async move {
let _ = interlock_confirm.send(());
match (sender_tx, interlock_confirm) {
// Local-remote connection.
(Some(sender_tx), Some(interlock_confirm)) => {
let port = PortSerializer::connect(|connect| {
async move {
let _ = interlock_confirm.send(());

match connect.await {
Ok((raw_tx, _)) => {
let _ = sender_tx.send(Ok(raw_tx));
match connect.await {
Ok((raw_tx, _)) => {
let _ = sender_tx.send(Ok(raw_tx));
}
Err(err) => {
let _ = sender_tx.send(Err(ConnectError::Connect(err)));
}
}
}
Err(err) => {
let _ = sender_tx.send(Err(ConnectError::Connect(err)));
}
}
.boxed()
})?;

TransportedReceiver { port }.serialize(serializer)
}
.boxed()
})?;

TransportedReceiver { port }.serialize(serializer)
// Forwarding.
_ => {
let (successor_tx, successor_rx) = tokio::sync::oneshot::channel();
*self.successor_tx.lock().unwrap() = Some(successor_tx);
let (tx, rx) = super::channel();
PortSerializer::spawn(Self::forward(successor_rx, tx))?;

rx.serialize(serializer)
}
}
}
}

Expand Down Expand Up @@ -122,6 +147,23 @@ impl<'de> Deserialize<'de> for Receiver {
sender_tx: None,
receiver_rx,
interlock: Arc::new(Mutex::new(Interlock { sender: Location::Remote, receiver: Location::Local })),
successor_tx: std::sync::Mutex::new(None),
})
}
}

impl Drop for Receiver {
fn drop(&mut self) {
let successor_tx = self.successor_tx.lock().unwrap().take();
if let Some(successor_tx) = successor_tx {
let dummy = Self {
receiver: None,
sender_tx: None,
receiver_rx: tokio::sync::mpsc::unbounded_channel().1,
interlock: Arc::new(Mutex::new(Interlock::new())),
successor_tx: std::sync::Mutex::new(None),
};
let _ = successor_tx.send(mem::replace(self, dummy));
}
}
}
86 changes: 64 additions & 22 deletions remoc/src/rch/bin/sender.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
use futures::FutureExt;
use serde::{ser, Deserialize, Serialize};
use serde::{Deserialize, Serialize};
use std::{
fmt,
fmt, mem,
sync::{Arc, Mutex},
};

Expand All @@ -20,6 +20,7 @@ pub struct Sender {
pub(super) sender_rx: tokio::sync::mpsc::UnboundedReceiver<Result<chmux::Sender, ConnectError>>,
pub(super) receiver_tx: Option<tokio::sync::mpsc::UnboundedSender<Result<chmux::Receiver, ConnectError>>>,
pub(super) interlock: Arc<Mutex<Interlock>>,
pub(super) successor_tx: std::sync::Mutex<Option<tokio::sync::oneshot::Sender<Self>>>,
}

impl fmt::Debug for Sender {
Expand Down Expand Up @@ -53,7 +54,17 @@ impl Sender {
/// to the remote endpoint.
pub async fn into_inner(mut self) -> Result<chmux::Sender, ConnectError> {
self.connect().await;
self.sender.unwrap()
self.sender.take().unwrap()
}

/// Forward data.
async fn forward(successor_rx: tokio::sync::oneshot::Receiver<Self>, rx: super::Receiver) {
let Ok(tx) = successor_rx.await else { return };
let Ok(mut tx) = tx.into_inner().await else { return };
let Ok(mut rx) = rx.into_inner().await else { return };
if let Err(err) = chmux::forward(&mut rx, &mut tx).await {
tracing::debug!("forwarding binary channel failed: {err}");
}
}
}

Expand All @@ -63,34 +74,48 @@ impl Serialize for Sender {
where
S: serde::Serializer,
{
let receiver_tx =
self.receiver_tx.clone().ok_or_else(|| ser::Error::custom("cannot forward received sender"))?;

let receiver_tx = self.receiver_tx.clone();
let interlock_confirm = {
let mut interlock = self.interlock.lock().unwrap();
if !interlock.receiver.check_local() {
return Err(ser::Error::custom("cannot send sender because receiver has been sent"));
if interlock.receiver.check_local() {
Some(interlock.receiver.start_send())
} else {
None
}
interlock.receiver.start_send()
};

let port = PortSerializer::connect(|connect| {
async move {
let _ = interlock_confirm.send(());
match (receiver_tx, interlock_confirm) {
// Local-remote connection.
(Some(receiver_tx), Some(interlock_confirm)) => {
let port = PortSerializer::connect(|connect| {
async move {
let _ = interlock_confirm.send(());

match connect.await {
Ok((_, raw_rx)) => {
let _ = receiver_tx.send(Ok(raw_rx));
match connect.await {
Ok((_, raw_rx)) => {
let _ = receiver_tx.send(Ok(raw_rx));
}
Err(err) => {
let _ = receiver_tx.send(Err(ConnectError::Connect(err)));
}
}
}
Err(err) => {
let _ = receiver_tx.send(Err(ConnectError::Connect(err)));
}
}
.boxed()
})?;

TransportedSender { port }.serialize(serializer)
}
.boxed()
})?;

TransportedSender { port }.serialize(serializer)
// Forwarding.
_ => {
let (successor_tx, successor_rx) = tokio::sync::oneshot::channel();
*self.successor_tx.lock().unwrap() = Some(successor_tx);
let (tx, rx) = super::channel();
PortSerializer::spawn(Self::forward(successor_rx, rx))?;

tx.serialize(serializer)
}
}
}
}

Expand Down Expand Up @@ -122,6 +147,23 @@ impl<'de> Deserialize<'de> for Sender {
sender_rx,
receiver_tx: None,
interlock: Arc::new(Mutex::new(Interlock { sender: Location::Local, receiver: Location::Remote })),
successor_tx: std::sync::Mutex::new(None),
})
}
}

impl Drop for Sender {
fn drop(&mut self) {
let successor_tx = self.successor_tx.lock().unwrap().take();
if let Some(successor_tx) = successor_tx {
let dummy = Self {
sender: None,
sender_rx: tokio::sync::mpsc::unbounded_channel().1,
receiver_tx: None,
interlock: Arc::new(Mutex::new(Interlock::new())),
successor_tx: std::sync::Mutex::new(None),
};
let _ = successor_tx.send(mem::replace(self, dummy));
}
}
}
7 changes: 7 additions & 0 deletions remoc/src/rch/interlock.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,13 @@ pub(crate) struct Interlock {
pub receiver: Location,
}

impl Interlock {
/// Creates a new interlock with local sender and receiver locations.
pub fn new() -> Self {
Self { sender: Location::Local, receiver: Location::Local }
}
}

/// Location of a sender or receiver.
pub(crate) enum Location {
Local,
Expand Down
4 changes: 2 additions & 2 deletions remoc/src/rch/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,8 +36,8 @@
//! A [binary channel](bin) can be used to exchange binary data over a channel.
//! It skips serialization and deserialization and thus is more efficient for binary data,
//! especially when using text codecs such as JSON.
//! However, it does not support forwarding and exactly one half of it must be on a remote
//! endpoint.
//! It does support forwarding.
//! However, at least one half of it must be on a remote endpoint.
//!
//! # Acknowledgements and connection latency
//!
Expand Down
Loading

0 comments on commit 8b78440

Please sign in to comment.