Skip to content

Commit 048ec6b

Browse files
committed
chmux: port data forwarding
Allow forwarding of port data, including transmitted ports.
1 parent b33b3e9 commit 048ec6b

File tree

3 files changed

+141
-35
lines changed

3 files changed

+141
-35
lines changed

remoc/src/chmux/forward.rs

+135
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,135 @@
1+
//! Channel data forwarding.
2+
3+
use bytes::Buf;
4+
use std::{fmt, num::Wrapping};
5+
6+
use super::{ConnectError, PortReq, Received, RecvChunkError, RecvError, SendError};
7+
8+
/// An error occurred during forwarding of a message.
9+
#[derive(Debug, Clone)]
10+
#[cfg_attr(feature = "serde", derive(serde::Serialize, serde::Deserialize))]
11+
pub enum ForwardError {
12+
/// Sending failed.
13+
Send(SendError),
14+
/// Receiving failed.
15+
Recv(RecvError),
16+
}
17+
18+
impl From<SendError> for ForwardError {
19+
fn from(err: SendError) -> Self {
20+
Self::Send(err)
21+
}
22+
}
23+
24+
impl From<RecvError> for ForwardError {
25+
fn from(err: RecvError) -> Self {
26+
Self::Recv(err)
27+
}
28+
}
29+
30+
impl fmt::Display for ForwardError {
31+
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
32+
match self {
33+
ForwardError::Send(err) => write!(f, "forward send failed: {err}"),
34+
ForwardError::Recv(err) => write!(f, "forward receive failed: {err}"),
35+
}
36+
}
37+
}
38+
39+
impl std::error::Error for ForwardError {}
40+
41+
/// Forwards all data received from a receiver to a sender.
42+
///
43+
/// This also recursively setups forwarding for all transmitted ports.
44+
///
45+
/// Returns the total number of bytes forwarded.
46+
pub async fn forward(rx: &mut super::Receiver, tx: &mut super::Sender) -> Result<usize, ForwardError> {
47+
// Required to avoid borrow checking loop limitation.
48+
fn spawn_forward(id: u32, mut rx: super::Receiver, mut tx: super::Sender) {
49+
tokio::spawn(async move {
50+
if let Err(err) = forward(&mut rx, &mut tx).await {
51+
tracing::debug!("port forwarding for id {id} failed: {err}");
52+
}
53+
});
54+
}
55+
56+
let mut total = Wrapping(0);
57+
58+
loop {
59+
match rx.recv_any().await? {
60+
// Data.
61+
Some(Received::Data(data)) => {
62+
total += data.remaining();
63+
tx.send(data.into()).await?;
64+
}
65+
66+
// Data chunks.
67+
Some(Received::Chunks) => {
68+
let mut chunk_tx = tx.send_chunks();
69+
loop {
70+
match rx.recv_chunk().await {
71+
Ok(Some(chunk)) => {
72+
total += chunk.remaining();
73+
chunk_tx = chunk_tx.send(chunk).await?;
74+
}
75+
Ok(None) => {
76+
chunk_tx.finish().await?;
77+
break;
78+
}
79+
Err(RecvChunkError::Cancelled) => break,
80+
Err(RecvChunkError::ChMux) => return Err(ForwardError::Recv(RecvError::ChMux)),
81+
}
82+
}
83+
}
84+
85+
// Ports.
86+
Some(Received::Requests(reqs)) => {
87+
let allocator = tx.port_allocator();
88+
89+
// Allocate local outgoing ports for forwarding.
90+
let mut ports = Vec::new();
91+
let mut wait = false;
92+
for req in &reqs {
93+
let port = allocator.allocate().await;
94+
ports.push(PortReq::new(port).with_id(req.id()));
95+
wait |= req.is_wait();
96+
}
97+
98+
// Connect them.
99+
let connects = tx.connect(ports, wait).await?;
100+
for (req, connect) in reqs.into_iter().zip(connects) {
101+
tokio::spawn(async move {
102+
let id = req.id();
103+
match connect.await {
104+
Ok((out_tx, out_rx)) => {
105+
let in_port = out_tx.port_allocator().allocate().await;
106+
match req.accept_from(in_port).await {
107+
Ok((in_tx, in_rx)) => {
108+
spawn_forward(id, out_rx, in_tx);
109+
spawn_forward(id, in_rx, out_tx);
110+
}
111+
Err(err) => {
112+
tracing::debug!("port forwarding for id {id} failed to accept: {err}");
113+
}
114+
}
115+
}
116+
Err(err) => {
117+
tracing::debug!("port forwarding for id {id} failed to connect: {err}");
118+
req.reject(matches!(
119+
err,
120+
ConnectError::LocalPortsExhausted | ConnectError::RemotePortsExhausted
121+
))
122+
.await;
123+
}
124+
}
125+
});
126+
}
127+
}
128+
129+
// End.
130+
None => break,
131+
}
132+
}
133+
134+
Ok(total.0)
135+
}

remoc/src/chmux/mod.rs

+2
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@ mod any_storage;
1919
mod cfg;
2020
mod client;
2121
mod credit;
22+
mod forward;
2223
mod listener;
2324
mod msg;
2425
mod mux;
@@ -29,6 +30,7 @@ mod sender;
2930
pub use any_storage::{AnyBox, AnyEntry, AnyStorage};
3031
pub use cfg::{Cfg, PortsExhausted};
3132
pub use client::{Client, Connect, ConnectError};
33+
pub use forward::{forward, ForwardError};
3234
pub use listener::{Listener, ListenerError, ListenerStream, Request};
3335
pub use mux::ChMux;
3436
pub use port_allocator::{PortAllocator, PortNumber, PortReq};

remoc/src/robj/lazy_blob/fw_bin.rs

+4-35
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,7 @@
33
use serde::{ser, Deserialize, Serialize};
44
use std::sync::Mutex;
55

6-
use crate::{chmux::Received, rch::bin};
6+
use crate::{chmux, rch::bin};
77

88
/// A chmux sender that can be remotely sent and forwarded.
99
pub(crate) struct Sender {
@@ -44,43 +44,12 @@ impl Serialize for Sender {
4444
(Some(bin_tx), None) => {
4545
let (bin_fw_tx, bin_fw_rx) = bin::channel();
4646
tokio::spawn(async move {
47-
let mut bin_tx = if let Ok(bin_tx) = bin_tx.into_inner().await { bin_tx } else { return };
48-
let mut bin_fw_rx =
49-
if let Ok(bin_fw_rx) = bin_fw_rx.into_inner().await { bin_fw_rx } else { return };
47+
let Ok(mut bin_tx) = bin_tx.into_inner().await else { return };
48+
let Ok(mut bin_fw_rx) = bin_fw_rx.into_inner().await else { return };
5049

5150
// No error handling is performed, because complete transmission of
5251
// data is verified by size.
53-
loop {
54-
match bin_fw_rx.recv_any().await {
55-
Ok(Some(Received::Data(data))) => {
56-
if bin_tx.send(data.into()).await.is_err() {
57-
return;
58-
}
59-
}
60-
Ok(Some(Received::Chunks)) => {
61-
let mut chunk_tx = bin_tx.send_chunks();
62-
loop {
63-
match bin_fw_rx.recv_chunk().await {
64-
Ok(Some(chunk)) => {
65-
chunk_tx = match chunk_tx.send(chunk).await {
66-
Ok(chunk_tx) => chunk_tx,
67-
Err(_) => return,
68-
};
69-
}
70-
Ok(None) => {
71-
if chunk_tx.finish().await.is_err() {
72-
return;
73-
}
74-
break;
75-
}
76-
Err(_) => return,
77-
}
78-
}
79-
}
80-
Ok(None) => break,
81-
_ => return,
82-
}
83-
}
52+
let _ = chmux::forward(&mut bin_fw_rx, &mut bin_tx).await;
8453
});
8554
TransportedSender { bin_tx: bin_fw_tx }.serialize(serializer)
8655
}

0 commit comments

Comments
 (0)