From cf0588769f2423cb93bf29819ab619f6a0f0e347 Mon Sep 17 00:00:00 2001 From: Mingwei Samuel Date: Mon, 23 Sep 2024 10:16:18 -0700 Subject: [PATCH 1/4] build: use different target dir for rust-analyzer to prevent thrashing with cargo --- .vscode/settings.json | 12 +++++++++--- 1 file changed, 9 insertions(+), 3 deletions(-) diff --git a/.vscode/settings.json b/.vscode/settings.json index 1057a431c935..978d549c04ad 100644 --- a/.vscode/settings.json +++ b/.vscode/settings.json @@ -13,10 +13,16 @@ } } ], + "rust-analyzer.runnables.extraTestBinaryArgs": [ + "--nocapture" + ], + "rust-analyzer.cargo.features": [ + "python" + ], + "rust-analyzer.server.extraEnv": { + "CARGO_TARGET_DIR": "${workspaceFolder}/target/rust-analyzer", + }, "files.watcherExclude": { "**/target": true }, - "rust-analyzer.cargo.features": [ - "python" - ] } From 006890f10a50befc90b1cacc76a9512298748389 Mon Sep 17 00:00:00 2001 From: Mingwei Samuel Date: Thu, 5 Sep 2024 15:03:36 -0700 Subject: [PATCH 2/4] fix(hydroflow): cleanup temp tcp networking code, fix race condition fix #1458 only spawn one task to prevent races between tasks --- hydroflow/src/util/tcp.rs | 168 ++++++++++++++++++++++---------------- 1 file changed, 96 insertions(+), 72 deletions(-) diff --git a/hydroflow/src/util/tcp.rs b/hydroflow/src/util/tcp.rs index 5f1cab36003b..77f1606b3c4c 100644 --- a/hydroflow/src/util/tcp.rs +++ b/hydroflow/src/util/tcp.rs @@ -1,16 +1,15 @@ #![cfg(not(target_arch = "wasm32"))] -use std::cell::RefCell; use std::collections::hash_map::Entry::{Occupied, Vacant}; use std::collections::HashMap; use std::net::SocketAddr; -use std::pin::pin; -use std::rc::Rc; use futures::{SinkExt, StreamExt}; use tokio::net::tcp::{OwnedReadHalf, OwnedWriteHalf}; use tokio::net::{TcpListener, TcpSocket, TcpStream}; +use tokio::select; use tokio::task::spawn_local; +use tokio_stream::StreamMap; use tokio_util::codec::{ BytesCodec, Decoder, Encoder, FramedRead, FramedWrite, LengthDelimitedCodec, LinesCodec, }; @@ -74,6 +73,7 @@ pub type TcpFramedSink = Sender<(T, SocketAddr)>; pub type TcpFramedStream = Receiver::Item, SocketAddr), ::Error>>; +// TODO(mingwei): this temporary code should be replaced with a properly thought out networking system. /// Create a listening tcp socket, and then as new connections come in, receive their data and forward it to a queue. pub async fn bind_tcp>( endpoint: SocketAddr, @@ -83,60 +83,64 @@ pub async fn bind_tcp> let bound_endpoint = listener.local_addr()?; - let (tx_egress, mut rx_egress) = unsync_channel(None); - let (tx_ingress, rx_ingress) = unsync_channel(None); - - let clients = Rc::new(RefCell::new(HashMap::new())); - - spawn_local({ - let clients = clients.clone(); - - async move { - while let Some((payload, addr)) = rx_egress.next().await { - let client = clients.borrow_mut().remove(&addr); - - if let Some(mut sender) = client { - let _ = SinkExt::send(&mut sender, payload).await; - clients.borrow_mut().insert(addr, sender); - } - } - } - }); + let (send_egress, mut recv_egress) = unsync_channel::<(T, SocketAddr)>(None); + let (send_ingres, recv_ingres) = unsync_channel(None); spawn_local(async move { + let send_ingress = send_ingres; + let mut peers_send = HashMap::new(); + let mut peers_recv = StreamMap::new(); + loop { - let (stream, peer_addr) = if let Ok((stream, _)) = listener.accept().await { - if let Ok(peer_addr) = stream.peer_addr() { - (stream, peer_addr) - } else { - continue; + // Calling methods in a loop, futures must be cancel-safe. + select! { + biased; + // Accept new clients. + new_peer = listener.accept() => { + let Ok((stream, _addr)) = new_peer else { + continue; + }; + let Ok(peer_addr) = stream.peer_addr() else { + continue; + }; + let (peer_send, peer_recv) = tcp_framed(stream, codec.clone()); + + // TODO: Using peer_addr here as the key is a little bit sketchy. + // It's possible that a peer could send a message, disconnect, then another peer connects from the + // same IP address (and the same src port), and then the response could be sent to that new client. + // This can be solved by using monotonically increasing IDs for each new peer, but would break the + // similarity with the UDP versions of this function. + peers_send.insert(peer_addr, peer_send); + peers_recv.insert(peer_addr, peer_recv); } - } else { - continue; - }; - - let mut tx_ingress = tx_ingress.clone(); - - let (send, recv) = tcp_framed(stream, codec.clone()); - - // TODO: Using peer_addr here as the key is a little bit sketchy. - // It's possible that a client could send a message, disconnect, then another client connects from the same IP address (and the same src port), and then the response could be sent to that new client. - // This can be solved by using monotonically increasing IDs for each new client, but would break the similarity with the UDP versions of this function. - clients.borrow_mut().insert(peer_addr, send); - - spawn_local({ - let clients = clients.clone(); - async move { - let mapped = recv.map(|x| Ok(x.map(|x| (x, peer_addr)))); - let _ = tx_ingress.send_all(&mut pin!(mapped)).await; - - clients.borrow_mut().remove(&peer_addr); + // Send outgoing messages. + msg_send = recv_egress.next() => { + let Some((payload, peer_addr)) = msg_send else { + continue; + }; + let Some(stream) = peers_send.get_mut(&peer_addr) else { + eprintln!("Dropping message to non-connected peer: {}", peer_addr); + continue; + }; + if let Err(_err) = SinkExt::send(stream, payload).await { + eprintln!("Failed to send message to peer: {}", peer_addr); + }; + } + // Receive incoming messages. + msg_recv = peers_recv.next(), if !peers_recv.is_empty() => { + let Some((peer_addr, payload_result)) = msg_recv else { + eprintln!("Error receiving message"); + continue; + }; + if let Err(err) = send_ingress.send(payload_result.map(|payload| (payload, peer_addr))).await { + eprintln!("Error passing along received message: {:?}", err); + } } - }); + } } }); - Ok((tx_egress, rx_ingress, bound_endpoint)) + Ok((send_egress, recv_ingres, bound_endpoint)) } /// The inverse of [`bind_tcp`]. @@ -147,34 +151,54 @@ pub async fn bind_tcp> pub fn connect_tcp>( codec: Codec, ) -> (TcpFramedSink, TcpFramedStream) { - let (tx_egress, mut rx_egress) = unsync_channel(None); - let (tx_ingress, rx_ingress) = unsync_channel(None); + let (send_egress, mut recv_egress) = unsync_channel(None); + let (send_ingres, recv_ingres) = unsync_channel(None); spawn_local(async move { - let mut streams = HashMap::new(); - - while let Some((payload, addr)) = rx_egress.next().await { - let stream = match streams.entry(addr) { - Occupied(entry) => entry.into_mut(), - Vacant(entry) => { - let socket = TcpSocket::new_v4().unwrap(); - let stream = socket.connect(addr).await.unwrap(); - - let (send, recv) = tcp_framed(stream, codec.clone()); + let send_ingres = send_ingres; + let mut peers_send = HashMap::new(); + let mut peers_recv = StreamMap::new(); - let mut tx_ingress = tx_ingress.clone(); - spawn_local(async move { - let mapped = recv.map(|x| Ok(x.map(|x| (x, addr)))); - let _ = tx_ingress.send_all(&mut pin!(mapped)).await; - }); - - entry.insert(send) + loop { + // Calling methods in a loop, futures must be cancel-safe. + select! { + biased; + // Send outgoing messages. + msg_send = recv_egress.next() => { + let Some((payload, peer_addr)) = msg_send else { + continue; + }; + + let stream = match peers_send.entry(peer_addr) { + Occupied(entry) => entry.into_mut(), + Vacant(entry) => { + let socket = TcpSocket::new_v4().unwrap(); + let stream = socket.connect(peer_addr).await.unwrap(); + + let (peer_send, peer_recv) = tcp_framed(stream, codec.clone()); + + peers_recv.insert(peer_addr, peer_recv); + entry.insert(peer_send) + } + }; + + if let Err(_err) = stream.send(payload).await { + eprintln!("Failed to send message to peer: {}", peer_addr); + } } - }; - - let _ = stream.send(payload).await; + // Receive incoming messages. + msg_recv = peers_recv.next(), if !peers_recv.is_empty() => { + let Some((peer_addr, payload_result)) = msg_recv else { + // End of stream. + break; + }; + if let Err(err) = send_ingres.send(payload_result.map(|payload| (payload, peer_addr))).await { + eprintln!("Error passing along received message: {:?}", err); + } + } + } } }); - (tx_egress, rx_ingress) + (send_egress, recv_ingres) } From 3803af4880c125c6dca66e075be714ec9edc0ab3 Mon Sep 17 00:00:00 2001 From: Mingwei Samuel Date: Mon, 23 Sep 2024 10:40:21 -0700 Subject: [PATCH 3/4] fixup! fix(hydroflow): cleanup temp tcp networking code, fix race condition fix #1458 --- hydroflow/src/util/tcp.rs | 16 +++++++--------- 1 file changed, 7 insertions(+), 9 deletions(-) diff --git a/hydroflow/src/util/tcp.rs b/hydroflow/src/util/tcp.rs index 77f1606b3c4c..a1bd365e0cc6 100644 --- a/hydroflow/src/util/tcp.rs +++ b/hydroflow/src/util/tcp.rs @@ -119,21 +119,20 @@ pub async fn bind_tcp> continue; }; let Some(stream) = peers_send.get_mut(&peer_addr) else { - eprintln!("Dropping message to non-connected peer: {}", peer_addr); + tracing::warn!("Dropping message to non-connected peer: {}", peer_addr); continue; }; if let Err(_err) = SinkExt::send(stream, payload).await { - eprintln!("Failed to send message to peer: {}", peer_addr); + tracing::warn!("Failed to send message to peer: {}", peer_addr); }; } // Receive incoming messages. msg_recv = peers_recv.next(), if !peers_recv.is_empty() => { let Some((peer_addr, payload_result)) = msg_recv else { - eprintln!("Error receiving message"); - continue; + unreachable!(); // => `peers_recv.is_empty()`. }; if let Err(err) = send_ingress.send(payload_result.map(|payload| (payload, peer_addr))).await { - eprintln!("Error passing along received message: {:?}", err); + tracing::warn!("Error passing along received message: {:?}", err); } } } @@ -183,17 +182,16 @@ pub fn connect_tcp>( }; if let Err(_err) = stream.send(payload).await { - eprintln!("Failed to send message to peer: {}", peer_addr); + tracing::warn!("Failed to send message to peer: {}", peer_addr); } } // Receive incoming messages. msg_recv = peers_recv.next(), if !peers_recv.is_empty() => { let Some((peer_addr, payload_result)) = msg_recv else { - // End of stream. - break; + unreachable!(); // => `peers_recv.is_empty()`. }; if let Err(err) = send_ingres.send(payload_result.map(|payload| (payload, peer_addr))).await { - eprintln!("Error passing along received message: {:?}", err); + tracing::warn!("Error passing along received message: {:?}", err); } } } From b1f96fddb7ac138f0f797b06f5d9e6141889e7da Mon Sep 17 00:00:00 2001 From: Mingwei Samuel Date: Mon, 23 Sep 2024 10:41:14 -0700 Subject: [PATCH 4/4] fixup! fixup! fix(hydroflow): cleanup temp tcp networking code, fix race condition fix #1458 --- hydroflow/src/util/tcp.rs | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/hydroflow/src/util/tcp.rs b/hydroflow/src/util/tcp.rs index a1bd365e0cc6..d94a5826c83f 100644 --- a/hydroflow/src/util/tcp.rs +++ b/hydroflow/src/util/tcp.rs @@ -123,7 +123,7 @@ pub async fn bind_tcp> continue; }; if let Err(_err) = SinkExt::send(stream, payload).await { - tracing::warn!("Failed to send message to peer: {}", peer_addr); + tracing::error!("Failed to send message to peer: {}", peer_addr); }; } // Receive incoming messages. @@ -132,7 +132,7 @@ pub async fn bind_tcp> unreachable!(); // => `peers_recv.is_empty()`. }; if let Err(err) = send_ingress.send(payload_result.map(|payload| (payload, peer_addr))).await { - tracing::warn!("Error passing along received message: {:?}", err); + tracing::error!("Error passing along received message: {:?}", err); } } } @@ -182,7 +182,7 @@ pub fn connect_tcp>( }; if let Err(_err) = stream.send(payload).await { - tracing::warn!("Failed to send message to peer: {}", peer_addr); + tracing::error!("Failed to send message to peer: {}", peer_addr); } } // Receive incoming messages. @@ -191,7 +191,7 @@ pub fn connect_tcp>( unreachable!(); // => `peers_recv.is_empty()`. }; if let Err(err) = send_ingres.send(payload_result.map(|payload| (payload, peer_addr))).await { - tracing::warn!("Error passing along received message: {:?}", err); + tracing::error!("Error passing along received message: {:?}", err); } } }