Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

24 changes: 14 additions & 10 deletions clients/native/src/client/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ use nym_client_core::client::base_client::{
BaseClientBuilder, ClientInput, ClientOutput, ClientState,
};
use nym_sphinx::params::PacketType;
use nym_task::TaskHandle;
use nym_task::ShutdownManager;
use nym_validator_client::QueryHttpRpcNyxdClient;
use std::error::Error;
use std::path::PathBuf;
Expand All @@ -29,6 +29,8 @@ pub struct SocketClient {

/// Optional path to a .json file containing standalone network details.
custom_mixnet: Option<PathBuf>,

shutdown_manager: ShutdownManager,
}

impl SocketClient {
Expand All @@ -40,6 +42,7 @@ impl SocketClient {
SocketClient {
config,
custom_mixnet,
shutdown_manager: Default::default(),
}
}

Expand All @@ -49,7 +52,7 @@ impl SocketClient {
client_output: ClientOutput,
client_state: ClientState,
self_address: &Recipient,
task_client: nym_task::TaskClient,
shutdown_token: nym_task::ShutdownToken,
packet_type: PacketType,
) {
info!("Starting websocket listener...");
Expand Down Expand Up @@ -77,24 +80,24 @@ impl SocketClient {
shared_lane_queue_lengths,
reply_controller_sender,
Some(packet_type),
task_client.fork("websocket_handler"),
shutdown_token.clone(),
);

websocket::Listener::new(
config.socket.host,
config.socket.listening_port,
task_client.with_suffix("websocket_listener"),
shutdown_token.child_token(),
)
.start(websocket_handler);
}

/// blocking version of `start_socket` method. Will run forever (or until SIGINT is sent)
pub async fn run_socket_forever(self) -> Result<(), Box<dyn Error + Send + Sync>> {
let shutdown = self.start_socket().await?;
let mut shutdown = self.start_socket().await?;

let res = shutdown.wait_for_shutdown().await;
shutdown.run_until_shutdown().await;
log::info!("Stopping nym-client");
res
Ok(())
}

async fn initialise_storage(&self) -> Result<OnDiskPersistent, ClientError> {
Expand All @@ -119,6 +122,7 @@ impl SocketClient {

let mut base_client =
BaseClientBuilder::new(self.config().base(), storage, dkg_query_client)
.with_shutdown(self.shutdown_manager.shutdown_tracker_owned())
.with_user_agent(user_agent);

if let Some(custom_mixnet) = &self.custom_mixnet {
Expand All @@ -128,7 +132,7 @@ impl SocketClient {
Ok(base_client)
}

pub async fn start_socket(self) -> Result<TaskHandle, ClientError> {
pub async fn start_socket(self) -> Result<ShutdownManager, ClientError> {
if !self.config.socket.socket_type.is_websocket() {
return Err(ClientError::InvalidSocketMode);
}
Expand All @@ -147,13 +151,13 @@ impl SocketClient {
client_output,
client_state,
&self_address,
started_client.task_handle.get_handle(),
self.shutdown_manager.child_shutdown_token(),
packet_type,
);

info!("Client startup finished!");
info!("The address of this client is: {self_address}");

Ok(started_client.task_handle)
Ok(self.shutdown_manager)
}
}
48 changes: 21 additions & 27 deletions clients/native/src/websocket/handler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ use nym_sphinx::receiver::ReconstructedMessage;
use nym_task::connections::{
ConnectionCommand, ConnectionCommandSender, ConnectionId, LaneQueueLengths, TransmissionLane,
};
use nym_task::TaskClient;
use nym_task::ShutdownToken;
use std::time::Duration;
use tokio::net::TcpStream;
use tokio::time::Instant;
Expand All @@ -44,7 +44,7 @@ pub(crate) struct HandlerBuilder {
lane_queue_lengths: LaneQueueLengths,
reply_controller_sender: ReplyControllerSender,
packet_type: Option<PacketType>,
task_client: TaskClient,
shutdown_token: ShutdownToken,
}

impl HandlerBuilder {
Expand All @@ -57,7 +57,7 @@ impl HandlerBuilder {
lane_queue_lengths: LaneQueueLengths,
reply_controller_sender: ReplyControllerSender,
packet_type: Option<PacketType>,
task_client: TaskClient,
shutdown_token: ShutdownToken,
) -> Self {
Self {
msg_input,
Expand All @@ -67,14 +67,13 @@ impl HandlerBuilder {
lane_queue_lengths,
reply_controller_sender,
packet_type,
task_client,
shutdown_token,
}
}

// TODO: make sure we only ever have one active handler
pub fn create_active_handler(&self) -> Handler {
let mut task_client = self.task_client.fork("active_handler");
task_client.disarm();
let shutdown_token = self.shutdown_token.clone();
Handler {
msg_input: self.msg_input.clone(),
client_connection_tx: self.client_connection_tx.clone(),
Expand All @@ -85,7 +84,7 @@ impl HandlerBuilder {
lane_queue_lengths: self.lane_queue_lengths.clone(),
reply_controller_sender: self.reply_controller_sender.clone(),
packet_type: self.packet_type,
task_client,
shutdown_token,
}
}
}
Expand All @@ -100,19 +99,14 @@ pub(crate) struct Handler {
lane_queue_lengths: LaneQueueLengths,
reply_controller_sender: ReplyControllerSender,
packet_type: Option<PacketType>,
task_client: TaskClient,
shutdown_token: ShutdownToken,
}

impl Drop for Handler {
fn drop(&mut self) {
if let Err(err) = self
let _ = self
.buffer_requester
.unbounded_send(ReceivedBufferMessage::ReceiverDisconnect)
{
if !self.task_client.is_shutdown_poll() {
error!("failed to disconnect the receiver from the buffer: {err}");
}
}
.unbounded_send(ReceivedBufferMessage::ReceiverDisconnect);
}
}

Expand Down Expand Up @@ -142,7 +136,7 @@ impl Handler {
{
Ok(length) => length,
Err(err) => {
if !self.task_client.is_shutdown_poll() {
if !self.shutdown_token.is_cancelled() {
error!(
"Failed to get reply queue length for connection {connection_id}: {err}"
);
Expand Down Expand Up @@ -192,7 +186,7 @@ impl Handler {
// the ack control is now responsible for chunking, etc.
let input_msg = InputMessage::new_regular(recipient, message, lane, self.packet_type);
if let Err(err) = self.msg_input.send(input_msg).await {
if !self.task_client.is_shutdown_poll() {
if !self.shutdown_token.is_cancelled() {
error!("Failed to send message to the input buffer: {err}");
}
}
Expand Down Expand Up @@ -225,7 +219,7 @@ impl Handler {
let input_msg =
InputMessage::new_anonymous(recipient, message, reply_surbs, lane, self.packet_type);
if let Err(err) = self.msg_input.send(input_msg).await {
if !self.task_client.is_shutdown_poll() {
if !self.shutdown_token.is_cancelled() {
error!("Failed to send anonymous message to the input buffer: {err}");
}
}
Expand Down Expand Up @@ -253,7 +247,7 @@ impl Handler {

let input_msg = InputMessage::new_reply(recipient_tag, message, lane, self.packet_type);
if let Err(err) = self.msg_input.send(input_msg).await {
if !self.task_client.is_shutdown_poll() {
if !self.shutdown_token.is_cancelled() {
error!("Failed to send reply message to the input buffer: {err}");
}
}
Expand All @@ -275,7 +269,7 @@ impl Handler {
.client_connection_tx
.unbounded_send(ConnectionCommand::Close(connection_id))
{
if !self.task_client.is_shutdown_poll() {
if !self.shutdown_token.is_cancelled() {
error!("Failed to send close connection command: {err}");
}
}
Expand Down Expand Up @@ -394,11 +388,14 @@ impl Handler {
}

async fn listen_for_requests(&mut self, mut msg_receiver: ReconstructedMessagesReceiver) {
let mut task_client = self.task_client.fork("select");
task_client.disarm();
let shutdown_token = self.shutdown_token.clone();

while !task_client.is_shutdown() {
loop {
tokio::select! {
_ = shutdown_token.cancelled() => {
log::trace!("Websocket handler: Received shutdown");
break;
}
// we can either get a client request from the websocket
socket_msg = self.next_websocket_request() => {
if socket_msg.is_none() {
Expand Down Expand Up @@ -436,9 +433,6 @@ impl Handler {
break;
}
}
_ = task_client.recv() => {
log::trace!("Websocket handler: Received shutdown");
}
}
}
log::debug!("Websocket handler: Exiting");
Expand All @@ -464,7 +458,7 @@ impl Handler {
reconstructed_sender,
))
{
if !self.task_client.is_shutdown_poll() {
if !self.shutdown_token.is_cancelled() {
error!("failed to announce the receiver to the buffer: {err}");
}
}
Expand Down
14 changes: 7 additions & 7 deletions clients/native/src/websocket/listener.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@

use super::handler::HandlerBuilder;
use log::*;
use nym_task::TaskClient;
use nym_task::ShutdownToken;
use std::net::IpAddr;
use std::{net::SocketAddr, process, sync::Arc};
use tokio::io::AsyncWriteExt;
Expand All @@ -23,15 +23,15 @@ impl State {
pub(crate) struct Listener {
address: SocketAddr,
state: State,
task_client: TaskClient,
shutdown_token: ShutdownToken,
}

impl Listener {
pub(crate) fn new(host: IpAddr, port: u16, task_client: TaskClient) -> Self {
pub(crate) fn new(host: IpAddr, port: u16, shutdown_token: ShutdownToken) -> Self {
Listener {
address: SocketAddr::new(host, port),
state: State::AwaitingConnection,
task_client,
shutdown_token,
}
}

Expand All @@ -46,11 +46,11 @@ impl Listener {

let notify = Arc::new(Notify::new());

while !self.task_client.is_shutdown() {
while !self.shutdown_token.is_cancelled() {
tokio::select! {
// When the handler finishes we check if shutdown is signalled
_ = notify.notified() => {
if self.task_client.is_shutdown() {
if self.shutdown_token.is_cancelled() {
log::trace!("Websocket listener: detected shutdown after connection closed");
break;
}
Expand All @@ -59,7 +59,7 @@ impl Listener {
}
// ... but when there is no connected client at the time of shutdown being
// signalled, we handle it here.
_ = self.task_client.recv() => {
_ = self.shutdown_token.cancelled() => {
if !self.state.is_connected() {
log::trace!("Not connected: shutting down");
break;
Expand Down
48 changes: 48 additions & 0 deletions common/client-core/src/client/base_client/helpers.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,9 @@
// Copyright 2023 - Nym Technologies SA <[email protected]>
// SPDX-License-Identifier: Apache-2.0

use crate::error::ClientCoreError;
use crate::{client::replies::reply_storage, config::DebugConfig};
use nym_task::{ShutdownManager, ShutdownToken, ShutdownTracker};

pub fn setup_empty_reply_surb_backend(debug_config: &DebugConfig) -> reply_storage::Empty {
reply_storage::Empty {
Expand All @@ -13,3 +15,49 @@ pub fn setup_empty_reply_surb_backend(debug_config: &DebugConfig) -> reply_stora
.maximum_reply_surb_storage_threshold,
}
}

// old 'TaskHandle'
pub(crate) enum ShutdownHelper {
Internal(ShutdownManager),
External(ShutdownTracker),
}

fn new_shutdown_manager() -> Result<ShutdownManager, ClientCoreError> {
cfg_if::cfg_if! {
if #[cfg(not(target_arch = "wasm32"))] {
Ok(ShutdownManager::build_new_default()?)
} else {
Ok(ShutdownManager::new_without_signals())
}
}
}

impl ShutdownHelper {
pub(crate) fn new(shutdown_tracker: Option<ShutdownTracker>) -> Result<Self, ClientCoreError> {
match shutdown_tracker {
None => Ok(ShutdownHelper::Internal(new_shutdown_manager()?)),
Some(shutdown_tracker) => Ok(ShutdownHelper::External(shutdown_tracker)),
}
}

pub(crate) fn into_internal(self) -> Option<ShutdownManager> {
match self {
ShutdownHelper::Internal(manager) => Some(manager),
ShutdownHelper::External(_) => None,
}
}

pub(crate) fn shutdown_token(&self) -> ShutdownToken {
match self {
ShutdownHelper::External(shutdown) => shutdown.clone_shutdown_token(),
ShutdownHelper::Internal(shutdown) => shutdown.clone_shutdown_token(),
}
}

pub(crate) fn tracker(&self) -> &ShutdownTracker {
match self {
ShutdownHelper::External(shutdown) => shutdown,
ShutdownHelper::Internal(shutdown) => shutdown.shutdown_tracker(),
}
}
}
Loading
Loading