From 37c856bc0a664e7e6ba336cf3cea51138f627155 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?J=C4=99drzej=20Stuczy=C5=84ski?= Date: Wed, 6 Aug 2025 17:10:40 +0100 Subject: [PATCH 1/2] feat: introduce additional checks when attempting to send to bounded channels or to a fallible gateway --- .../client-core/src/client/mix_traffic/mod.rs | 60 ++++++++++++------- .../input_message_listener.rs | 9 ++- .../retransmission_request_listener.rs | 9 ++- .../real_messages_control/message_handler.rs | 21 ++++--- .../real_traffic_stream.rs | 36 +++++++---- common/task/src/spawn.rs | 2 + 6 files changed, 91 insertions(+), 46 deletions(-) diff --git a/common/client-core/src/client/mix_traffic/mod.rs b/common/client-core/src/client/mix_traffic/mod.rs index be7a517f574..9839d74473b 100644 --- a/common/client-core/src/client/mix_traffic/mod.rs +++ b/common/client-core/src/client/mix_traffic/mod.rs @@ -96,35 +96,59 @@ impl MixTrafficController { mut mix_packets: Vec, ) -> Result<(), ErasedGatewayError> { debug_assert!(!mix_packets.is_empty()); - - let result = if mix_packets.len() == 1 { + let send_future = if mix_packets.len() == 1 { // SAFETY: we just checked we have one packet #[allow(clippy::unwrap_used)] let mix_packet = mix_packets.pop().unwrap(); - self.gateway_transceiver.send_mix_packet(mix_packet).await + self.gateway_transceiver.send_mix_packet(mix_packet) } else { - self.gateway_transceiver - .batch_send_mix_packets(mix_packets) - .await + self.gateway_transceiver.batch_send_mix_packets(mix_packets) }; - if result.is_err() { - self.consecutive_gateway_failure_count += 1; - } else { - trace!("We *might* have managed to forward sphinx packet(s) to the gateway!"); - self.consecutive_gateway_failure_count = 0; + tokio::select! { + biased; + _ = self.task_client.recv() => { + trace!("received shutdown while handling messages"); + Ok(()) + } + result = send_future => { + if result.is_err() { + self.consecutive_gateway_failure_count += 1; + } else { + trace!("We *might* have managed to forward sphinx packet(s) to the gateway!"); + self.consecutive_gateway_failure_count = 0; + } + + result + } } + } - result + async fn on_client_request(&mut self, client_request: ClientRequest) { + tokio::select! { + biased; + _ = self.task_client.recv() => { + trace!("received shutdown while handling client request"); + } + result = self.gateway_transceiver.send_client_request(client_request) => { + if let Err(err) = result { + error!("Failed to send client request: {err}") + } + } + } } pub fn start(mut self) { spawn_future!( async move { debug!("Started MixTrafficController with graceful shutdown support"); - while !self.task_client.is_shutdown() { tokio::select! { + biased; + _ = self.task_client.recv() => { + tracing::trace!("MixTrafficController: Received shutdown"); + break; + } mix_packets = self.mix_rx.recv() => match mix_packets { Some(mix_packets) => { if let Err(err) = self.on_messages(mix_packets).await { @@ -147,23 +171,15 @@ impl MixTrafficController { }, client_request = self.client_rx.recv() => match client_request { Some(client_request) => { - match self.gateway_transceiver.send_client_request(client_request).await { - Ok(_) => (), - Err(e) => error!("Failed to send client request: {e}"), - }; + self.on_client_request(client_request).await; }, None => { tracing::trace!("MixTrafficController, client request channel closed"); } }, - _ = self.task_client.recv() => { - tracing::trace!("MixTrafficController: Received shutdown"); - break; - } } } self.task_client.recv_timeout().await; - tracing::debug!("MixTrafficController: Exiting"); }, "MixTrafficController" diff --git a/common/client-core/src/client/real_messages_control/acknowledgement_control/input_message_listener.rs b/common/client-core/src/client/real_messages_control/acknowledgement_control/input_message_listener.rs index 8185eb5a0ae..f47f050b3b4 100644 --- a/common/client-core/src/client/real_messages_control/acknowledgement_control/input_message_listener.rs +++ b/common/client-core/src/client/real_messages_control/acknowledgement_control/input_message_listener.rs @@ -226,6 +226,11 @@ where while !self.task_client.is_shutdown() { tokio::select! { + biased; + _ = self.task_client.recv() => { + tracing::trace!("InputMessageListener: Received shutdown"); + break; + } input_msg = self.input_receiver.recv() => match input_msg { Some(input_msg) => { self.on_input_message(input_msg).await; @@ -235,9 +240,7 @@ where break; } }, - _ = self.task_client.recv() => { - tracing::trace!("InputMessageListener: Received shutdown"); - } + } } self.task_client.recv_timeout().await; diff --git a/common/client-core/src/client/real_messages_control/acknowledgement_control/retransmission_request_listener.rs b/common/client-core/src/client/real_messages_control/acknowledgement_control/retransmission_request_listener.rs index aab826de7a4..0b066d0cc0b 100644 --- a/common/client-core/src/client/real_messages_control/acknowledgement_control/retransmission_request_listener.rs +++ b/common/client-core/src/client/real_messages_control/acknowledgement_control/retransmission_request_listener.rs @@ -179,6 +179,11 @@ where while !self.task_client.is_shutdown() { tokio::select! { + biased; + _ = self.task_client.recv() => { + tracing::trace!("RetransmissionRequestListener: Received shutdown"); + break; + } timed_out_ack = self.request_receiver.next() => match timed_out_ack { Some(timed_out_ack) => self.on_retransmission_request(timed_out_ack, packet_type).await, None => { @@ -186,9 +191,7 @@ where break; } }, - _ = self.task_client.recv() => { - tracing::trace!("RetransmissionRequestListener: Received shutdown"); - } + } } self.task_client.recv_timeout().await; diff --git a/common/client-core/src/client/real_messages_control/message_handler.rs b/common/client-core/src/client/real_messages_control/message_handler.rs index 942d88c77a8..1dca40cf7e7 100644 --- a/common/client-core/src/client/real_messages_control/message_handler.rs +++ b/common/client-core/src/client/real_messages_control/message_handler.rs @@ -548,6 +548,7 @@ where pending_acks.push(pending_ack); } + drop(topology_permit); self.insert_pending_acks(pending_acks); self.forward_messages(real_messages, lane).await; @@ -730,17 +731,21 @@ where // tells real message sender (with the poisson timer) to send this to the mix network pub(crate) async fn forward_messages( - &self, + &mut self, messages: Vec, transmission_lane: TransmissionLane, ) { - if let Err(err) = self - .real_message_sender - .send((messages, transmission_lane)) - .await - { - if !self.task_client.is_shutdown_poll() { - error!("Failed to forward messages to the real message sender: {err}"); + tokio::select! { + biased; + _ = self.task_client.recv() => { + trace!("received shutdown while attempting to forward mixnet messages"); + } + sending_res = self.real_message_sender.send((messages, transmission_lane)) => { + if sending_res.is_err() { + error!( + "failed to forward mixnet messages due to closed channel (outside of shutdown!)" + ); + } } } } diff --git a/common/client-core/src/client/real_messages_control/real_traffic_stream.rs b/common/client-core/src/client/real_messages_control/real_traffic_stream.rs index d24947c9d24..7cbf3b6ad65 100644 --- a/common/client-core/src/client/real_messages_control/real_traffic_stream.rs +++ b/common/client-core/src/client/real_messages_control/real_traffic_stream.rs @@ -280,17 +280,33 @@ where } }; - if let Err(err) = self.mix_tx.send(vec![next_message]).await { - if !self.task_client.is_shutdown_poll() { - tracing::error!("Failed to send: {err}"); + let sending_res = tokio::select! { + biased; + _ = self.task_client.recv() => { + trace!("received shutdown signal while attempting to send mix message"); + return + } + sending_res = self.mix_tx.send(vec![next_message]) => { + sending_res + } + }; + + match sending_res { + Err(_) => { + if !self.task_client.is_shutdown_poll() { + tracing::error!( + "failed to send mixnet packet due to closed channel (outside of shutdown!)" + ); + } + } + Ok(_) => { + let event = if fragment_id.is_some() { + PacketStatisticsEvent::RealPacketSent(packet_size) + } else { + PacketStatisticsEvent::CoverPacketSent(packet_size) + }; + self.stats_tx.report(event.into()); } - } else { - let event = if fragment_id.is_some() { - PacketStatisticsEvent::RealPacketSent(packet_size) - } else { - PacketStatisticsEvent::CoverPacketSent(packet_size) - }; - self.stats_tx.report(event.into()); } // notify ack controller about sending our message only after we actually managed to push it diff --git a/common/task/src/spawn.rs b/common/task/src/spawn.rs index 16f77299d17..e0fe98c5dea 100644 --- a/common/task/src/spawn.rs +++ b/common/task/src/spawn.rs @@ -10,6 +10,7 @@ where } #[cfg(not(target_arch = "wasm32"))] +#[track_caller] pub fn spawn(future: F) where F: Future + Send + 'static, @@ -18,6 +19,7 @@ where tokio::spawn(future); } +#[track_caller] pub fn spawn_with_report_error(future: F, mut shutdown: TaskClient) where F: Future> + Send + 'static, From 057e4ce8c91e97884e4251cd4f74805ea8e83f8a Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?J=C4=99drzej=20Stuczy=C5=84ski?= Date: Fri, 8 Aug 2025 10:06:38 +0100 Subject: [PATCH 2/2] return error rather than panic when merging socket during shutdown --- common/client-libs/gateway-client/src/client/mod.rs | 2 +- common/client-libs/gateway-client/src/socket_state.rs | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/common/client-libs/gateway-client/src/client/mod.rs b/common/client-libs/gateway-client/src/client/mod.rs index 612349315f4..de90b3ad35d 100644 --- a/common/client-libs/gateway-client/src/client/mod.rs +++ b/common/client-libs/gateway-client/src/client/mod.rs @@ -201,7 +201,7 @@ impl GatewayClient { #[cfg(not(target_arch = "wasm32"))] pub async fn establish_connection(&mut self) -> Result<(), GatewayClientError> { debug!( - "Attemting to establish connection to gateway at: {}", + "Attempting to establish connection to gateway at: {}", self.gateway_address ); let (ws_stream, _) = connect_async( diff --git a/common/client-libs/gateway-client/src/socket_state.rs b/common/client-libs/gateway-client/src/socket_state.rs index b2a1e6ebb03..5489ec36287 100644 --- a/common/client-libs/gateway-client/src/socket_state.rs +++ b/common/client-libs/gateway-client/src/socket_state.rs @@ -337,7 +337,7 @@ impl PartiallyDelegatedHandle { // check if the split stream didn't error out let receive_res = stream_receiver .try_recv() - .expect("stream sender was somehow dropped without sending anything!"); + .map_err(|_| GatewayClientError::ConnectionAbruptlyClosed)?; if let Some(res) = receive_res { let _res = res?;