Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
60 changes: 38 additions & 22 deletions common/client-core/src/client/mix_traffic/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -96,35 +96,59 @@ impl MixTrafficController {
mut mix_packets: Vec<MixPacket>,
) -> Result<(), ErasedGatewayError> {
debug_assert!(!mix_packets.is_empty());

let result = if mix_packets.len() == 1 {
let send_future = if mix_packets.len() == 1 {
// SAFETY: we just checked we have one packet
#[allow(clippy::unwrap_used)]
let mix_packet = mix_packets.pop().unwrap();
self.gateway_transceiver.send_mix_packet(mix_packet).await
self.gateway_transceiver.send_mix_packet(mix_packet)
} else {
self.gateway_transceiver
.batch_send_mix_packets(mix_packets)
.await
self.gateway_transceiver.batch_send_mix_packets(mix_packets)
};

if result.is_err() {
self.consecutive_gateway_failure_count += 1;
} else {
trace!("We *might* have managed to forward sphinx packet(s) to the gateway!");
self.consecutive_gateway_failure_count = 0;
tokio::select! {
biased;
_ = self.task_client.recv() => {
trace!("received shutdown while handling messages");
Ok(())
}
result = send_future => {
if result.is_err() {
self.consecutive_gateway_failure_count += 1;
} else {
trace!("We *might* have managed to forward sphinx packet(s) to the gateway!");
self.consecutive_gateway_failure_count = 0;
}

result
}
}
}

result
async fn on_client_request(&mut self, client_request: ClientRequest) {
tokio::select! {
biased;
_ = self.task_client.recv() => {
trace!("received shutdown while handling client request");
}
result = self.gateway_transceiver.send_client_request(client_request) => {
if let Err(err) = result {
error!("Failed to send client request: {err}")
}
}
}
}

pub fn start(mut self) {
spawn_future!(
async move {
debug!("Started MixTrafficController with graceful shutdown support");

while !self.task_client.is_shutdown() {
tokio::select! {
biased;
_ = self.task_client.recv() => {
tracing::trace!("MixTrafficController: Received shutdown");
break;
}
mix_packets = self.mix_rx.recv() => match mix_packets {
Some(mix_packets) => {
if let Err(err) = self.on_messages(mix_packets).await {
Expand All @@ -147,23 +171,15 @@ impl MixTrafficController {
},
client_request = self.client_rx.recv() => match client_request {
Some(client_request) => {
match self.gateway_transceiver.send_client_request(client_request).await {
Ok(_) => (),
Err(e) => error!("Failed to send client request: {e}"),
};
self.on_client_request(client_request).await;
},
None => {
tracing::trace!("MixTrafficController, client request channel closed");
}
},
_ = self.task_client.recv() => {
tracing::trace!("MixTrafficController: Received shutdown");
break;
}
}
}
self.task_client.recv_timeout().await;

tracing::debug!("MixTrafficController: Exiting");
},
"MixTrafficController"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -226,6 +226,11 @@ where

while !self.task_client.is_shutdown() {
tokio::select! {
biased;
_ = self.task_client.recv() => {
tracing::trace!("InputMessageListener: Received shutdown");
break;
}
input_msg = self.input_receiver.recv() => match input_msg {
Some(input_msg) => {
self.on_input_message(input_msg).await;
Expand All @@ -235,9 +240,7 @@ where
break;
}
},
_ = self.task_client.recv() => {
tracing::trace!("InputMessageListener: Received shutdown");
}

}
}
self.task_client.recv_timeout().await;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -179,16 +179,19 @@ where

while !self.task_client.is_shutdown() {
tokio::select! {
biased;
_ = self.task_client.recv() => {
tracing::trace!("RetransmissionRequestListener: Received shutdown");
break;
}
timed_out_ack = self.request_receiver.next() => match timed_out_ack {
Some(timed_out_ack) => self.on_retransmission_request(timed_out_ack, packet_type).await,
None => {
tracing::trace!("RetransmissionRequestListener: Stopping since channel closed");
break;
}
},
_ = self.task_client.recv() => {
tracing::trace!("RetransmissionRequestListener: Received shutdown");
}

}
}
self.task_client.recv_timeout().await;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -548,6 +548,7 @@ where
pending_acks.push(pending_ack);
}

drop(topology_permit);
self.insert_pending_acks(pending_acks);
self.forward_messages(real_messages, lane).await;

Expand Down Expand Up @@ -730,17 +731,21 @@ where

// tells real message sender (with the poisson timer) to send this to the mix network
pub(crate) async fn forward_messages(
&self,
&mut self,
messages: Vec<RealMessage>,
transmission_lane: TransmissionLane,
) {
if let Err(err) = self
.real_message_sender
.send((messages, transmission_lane))
.await
{
if !self.task_client.is_shutdown_poll() {
error!("Failed to forward messages to the real message sender: {err}");
tokio::select! {
biased;
_ = self.task_client.recv() => {
trace!("received shutdown while attempting to forward mixnet messages");
}
sending_res = self.real_message_sender.send((messages, transmission_lane)) => {
if sending_res.is_err() {
error!(
"failed to forward mixnet messages due to closed channel (outside of shutdown!)"
);
}
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -280,17 +280,33 @@ where
}
};

if let Err(err) = self.mix_tx.send(vec![next_message]).await {
if !self.task_client.is_shutdown_poll() {
tracing::error!("Failed to send: {err}");
let sending_res = tokio::select! {
biased;
_ = self.task_client.recv() => {
trace!("received shutdown signal while attempting to send mix message");
return
}
sending_res = self.mix_tx.send(vec![next_message]) => {
sending_res
}
};

match sending_res {
Err(_) => {
if !self.task_client.is_shutdown_poll() {
tracing::error!(
"failed to send mixnet packet due to closed channel (outside of shutdown!)"
);
}
}
Ok(_) => {
let event = if fragment_id.is_some() {
PacketStatisticsEvent::RealPacketSent(packet_size)
} else {
PacketStatisticsEvent::CoverPacketSent(packet_size)
};
self.stats_tx.report(event.into());
}
} else {
let event = if fragment_id.is_some() {
PacketStatisticsEvent::RealPacketSent(packet_size)
} else {
PacketStatisticsEvent::CoverPacketSent(packet_size)
};
self.stats_tx.report(event.into());
}

// notify ack controller about sending our message only after we actually managed to push it
Expand Down
2 changes: 1 addition & 1 deletion common/client-libs/gateway-client/src/client/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -201,7 +201,7 @@ impl<C, St> GatewayClient<C, St> {
#[cfg(not(target_arch = "wasm32"))]
pub async fn establish_connection(&mut self) -> Result<(), GatewayClientError> {
debug!(
"Attemting to establish connection to gateway at: {}",
"Attempting to establish connection to gateway at: {}",
self.gateway_address
);
let (ws_stream, _) = connect_async(
Expand Down
2 changes: 1 addition & 1 deletion common/client-libs/gateway-client/src/socket_state.rs
Original file line number Diff line number Diff line change
Expand Up @@ -337,7 +337,7 @@ impl PartiallyDelegatedHandle {
// check if the split stream didn't error out
let receive_res = stream_receiver
.try_recv()
.expect("stream sender was somehow dropped without sending anything!");
.map_err(|_| GatewayClientError::ConnectionAbruptlyClosed)?;

if let Some(res) = receive_res {
let _res = res?;
Expand Down
2 changes: 2 additions & 0 deletions common/task/src/spawn.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ where
}

#[cfg(not(target_arch = "wasm32"))]
#[track_caller]
pub fn spawn<F>(future: F)
where
F: Future + Send + 'static,
Expand All @@ -18,6 +19,7 @@ where
tokio::spawn(future);
}

#[track_caller]
pub fn spawn_with_report_error<F, T, E>(future: F, mut shutdown: TaskClient)
where
F: Future<Output = Result<T, E>> + Send + 'static,
Expand Down
Loading