diff --git a/Cargo.lock b/Cargo.lock index 6e67ab43089..6faa70e3576 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -5186,6 +5186,7 @@ version = "1.1.15" dependencies = [ "async-trait", "base64 0.22.1", + "bincode", "bs58", "cfg-if", "clap", @@ -5230,6 +5231,7 @@ dependencies = [ "tokio", "tokio-stream", "tokio-tungstenite", + "tokio-util", "tokio_with_wasm", "tracing", "tungstenite 0.20.1", @@ -5311,6 +5313,7 @@ dependencies = [ "serde-wasm-bindgen 0.6.5", "serde_json", "thiserror 2.0.17", + "tokio", "tokio_with_wasm", "tsify", "wasm-bindgen", @@ -6453,6 +6456,7 @@ dependencies = [ "time", "tokio", "tokio-tungstenite", + "tokio-util", "url", "zeroize", ] @@ -6893,6 +6897,7 @@ dependencies = [ "nym-bandwidth-controller", "nym-bin-common", "nym-client-core", + "nym-config", "nym-credential-storage", "nym-credential-utils", "nym-credentials", @@ -6900,6 +6905,7 @@ dependencies = [ "nym-crypto", "nym-gateway-requests", "nym-http-api-client", + "nym-ip-packet-requests", "nym-network-defaults", "nym-ordered-buffer", "nym-service-providers-common", @@ -6912,6 +6918,7 @@ dependencies = [ "nym-topology", "nym-validator-client", "parking_lot", + "pnet_packet", "rand 0.8.5", "reqwest 0.12.22", "serde", @@ -7049,6 +7056,7 @@ dependencies = [ "tap", "thiserror 2.0.17", "tokio", + "tokio-util", "url", ] @@ -7086,6 +7094,8 @@ dependencies = [ name = "nym-sphinx" version = "0.1.0" dependencies = [ + "bincode", + "log", "nym-crypto", "nym-metrics", "nym-mixnet-contract-common", @@ -7103,8 +7113,10 @@ dependencies = [ "rand 0.8.5", "rand_chacha 0.3.1", "rand_distr", + "serde", "thiserror 2.0.17", "tokio", + "tokio-util", "tracing", ] @@ -7152,6 +7164,7 @@ dependencies = [ "nym-topology", "rand 0.8.5", "rand_chacha 0.3.1", + "serde", "thiserror 2.0.17", "tracing", "wasm-bindgen", @@ -7200,6 +7213,7 @@ dependencies = [ "nym-sphinx-anonymous-replies", "nym-sphinx-params", "nym-sphinx-types", + "serde", "thiserror 2.0.17", ] @@ -7326,6 +7340,7 @@ dependencies = [ "futures", "log", "nym-test-utils", + "serde", "thiserror 2.0.17", "tokio", "tokio-util", diff --git a/common/client-core/Cargo.toml b/common/client-core/Cargo.toml index 2f6d87dbc41..1f0a9e45ada 100644 --- a/common/client-core/Cargo.toml +++ b/common/client-core/Cargo.toml @@ -10,6 +10,7 @@ license.workspace = true [dependencies] async-trait = { workspace = true } +bincode = { workspace = true } base64 = { workspace = true } bs58 = { workspace = true } clap = { workspace = true, optional = true } @@ -25,6 +26,7 @@ sha2 = { workspace = true } si-scale = { workspace = true } thiserror = { workspace = true } url = { workspace = true, features = ["serde"] } +tokio-util = { workspace = true, features = ["codec"] } time = { workspace = true } tokio = { workspace = true, features = ["sync", "macros"] } tracing = { workspace = true } diff --git a/common/client-core/src/client/base_client/mod.rs b/common/client-core/src/client/base_client/mod.rs index 600085b4cf8..0dc9aa05e6a 100644 --- a/common/client-core/src/client/base_client/mod.rs +++ b/common/client-core/src/client/base_client/mod.rs @@ -36,6 +36,7 @@ use crate::init::{ types::{GatewaySetup, InitialisationResult}, }; use futures::channel::mpsc; +use futures::SinkExt; use nym_bandwidth_controller::BandwidthController; use nym_client_core_config_types::{ForgetMe, RememberMe}; use nym_client_core_gateways_storage::{GatewayDetails, GatewaysDetailsStore}; @@ -66,7 +67,7 @@ use std::os::raw::c_int as RawFd; use std::path::Path; use std::sync::Arc; use time::OffsetDateTime; -use tokio::sync::mpsc::Sender; +use tokio_util::sync::{PollSendError, PollSender}; use url::Url; #[cfg(target_arch = "wasm32")] @@ -112,10 +113,7 @@ pub struct ClientInput { } impl ClientInput { - pub async fn send( - &self, - message: InputMessage, - ) -> Result<(), tokio::sync::mpsc::error::SendError> { + pub async fn send(&mut self, message: InputMessage) -> Result<(), PollSendError> { self.input_sender.send(message).await } } @@ -745,7 +743,7 @@ where config: &Config, user_agent: Option, client_stats_id: String, - input_sender: Sender, + input_sender: PollSender, shutdown_tracker: &ShutdownTracker, ) -> ClientStatsSender { tracing::info!("Starting statistics control..."); @@ -1013,7 +1011,7 @@ where &self.config, self.user_agent.clone(), generate_client_stats_id(*self_address.identity()), - input_sender.clone(), + tokio_util::sync::PollSender::new(input_sender.clone()), &shutdown_tracker.clone(), ); @@ -1139,7 +1137,7 @@ where client_input: ClientInputStatus::AwaitingProducer { client_input: ClientInput { connection_command_sender: client_connection_tx, - input_sender, + input_sender: PollSender::new(input_sender), client_request_sender, }, }, diff --git a/common/client-core/src/client/inbound_messages.rs b/common/client-core/src/client/inbound_messages.rs index b74c9195bea..f01ba54749c 100644 --- a/common/client-core/src/client/inbound_messages.rs +++ b/common/client-core/src/client/inbound_messages.rs @@ -1,16 +1,27 @@ // Copyright 2020-2023 - Nym Technologies SA // SPDX-License-Identifier: Apache-2.0 - +use crate::error::ClientCoreError; +use crate::make_bincode_serializer; +use bincode::Options; use nym_sphinx::addressing::clients::Recipient; use nym_sphinx::anonymous_replies::requests::AnonymousSenderTag; use nym_sphinx::forwarding::packet::MixPacket; use nym_sphinx::params::PacketType; use nym_task::connections::TransmissionLane; +use serde::{Deserialize, Serialize}; +use std::convert::TryInto; +use tokio_util::{ + bytes::Buf, + bytes::BytesMut, + codec::{Decoder, Encoder}, +}; -pub type InputMessageSender = tokio::sync::mpsc::Sender; +pub type InputMessageSender = tokio_util::sync::PollSender; pub type InputMessageReceiver = tokio::sync::mpsc::Receiver; -#[derive(Debug)] +const LENGHT_ENCODING_PREFIX_SIZE: usize = 4; + +#[derive(Serialize, Deserialize, Debug)] pub enum InputMessage { /// Fire an already prepared mix packets into the network. /// No guarantees are made about it. For example no retransmssion @@ -65,6 +76,10 @@ pub enum InputMessage { } impl InputMessage { + pub fn simple(data: &[u8], recipient: Recipient) -> Self { + InputMessage::new_regular(recipient, data.to_vec(), TransmissionLane::General, None) + } + pub fn new_premade( msgs: Vec, lane: TransmissionLane, @@ -185,4 +200,336 @@ impl InputMessage { self.set_max_retransmissions(max_retransmissions); self } + #[allow(clippy::expect_used)] + pub fn serialized_size(&self) -> u64 { + make_bincode_serializer() + .serialized_size(self) + .expect("failed to get serialized InputMessage size") + + LENGHT_ENCODING_PREFIX_SIZE as u64 + } +} + +pub struct AdressedInputMessageCodec(pub Recipient); + +impl Encoder<&[u8]> for AdressedInputMessageCodec { + type Error = ClientCoreError; + + fn encode(&mut self, item: &[u8], buf: &mut BytesMut) -> Result<(), Self::Error> { + let mut codec = InputMessageCodec; + let input_message = InputMessage::simple(item, self.0); + codec.encode(input_message, buf)?; + Ok(()) + } +} + +pub struct InputMessageCodec; + +impl Encoder for InputMessageCodec { + type Error = ClientCoreError; + + fn encode(&mut self, item: InputMessage, buf: &mut BytesMut) -> Result<(), Self::Error> { + #[allow(clippy::expect_used)] + let encoded = make_bincode_serializer().serialize(&item)?; + let encoded_len = encoded.len() as u32; + let mut encoded_with_len = encoded_len.to_le_bytes().to_vec(); + encoded_with_len.extend(encoded); + buf.reserve(encoded_with_len.len()); + buf.extend_from_slice(&encoded_with_len); + Ok(()) + } +} + +impl Decoder for InputMessageCodec { + type Item = InputMessage; + type Error = ClientCoreError; + + fn decode(&mut self, buf: &mut BytesMut) -> Result, Self::Error> { + if buf.len() < LENGHT_ENCODING_PREFIX_SIZE { + return Ok(None); + } + #[allow(clippy::expect_used)] + let len = u32::from_le_bytes(buf[0..LENGHT_ENCODING_PREFIX_SIZE].try_into()?) as usize; + if buf.len() < len + LENGHT_ENCODING_PREFIX_SIZE { + return Ok(None); + } + + let decoded = make_bincode_serializer() + .deserialize(&buf[LENGHT_ENCODING_PREFIX_SIZE..len + LENGHT_ENCODING_PREFIX_SIZE])?; + + buf.advance(len + LENGHT_ENCODING_PREFIX_SIZE); + + Ok(Some(decoded)) + } +} + +#[cfg(test)] +mod tests { + use super::*; + use nym_sphinx::addressing::clients::Recipient; + use nym_sphinx::anonymous_replies::requests::AnonymousSenderTag; + use nym_sphinx::params::PacketType; + use rand::SeedableRng; + + fn test_recipient() -> Recipient { + Recipient::try_from_base58_string("CytBseW6yFXUMzz4SGAKdNLGR7q3sJLLYxyBGvutNEQV.4QXYyEVc5fUDjmmi8PrHN9tdUFV4PCvSJE1278cHyvoe@4sBbL1ngf1vtNqykydQKTFh26sQCw888GpUqvPvyNB4f").unwrap() + } + + fn test_sender_tag() -> AnonymousSenderTag { + let dummy_seed = [42u8; 32]; + let mut rng = rand_chacha::ChaCha20Rng::from_seed(dummy_seed); + AnonymousSenderTag::new_random(&mut rng) + } + + #[test] + fn encode_decode_all_variants() { + let mut codec = InputMessageCodec; + { + let mut buf = BytesMut::new(); + let msg = InputMessage::new_anonymous( + test_recipient(), + vec![1, 2, 3, 4, 5], + 3, + TransmissionLane::General, + None, + ); + codec.encode(msg, &mut buf).unwrap(); + let decoded = codec + .decode(&mut buf) + .unwrap() + .expect("Should decode message"); + + match decoded { + InputMessage::Anonymous { + data, reply_surbs, .. + } => { + assert_eq!(data, vec![1, 2, 3, 4, 5]); + assert_eq!(reply_surbs, 3); + } + _ => panic!("Expected Anonymous variant"), + } + } + + { + let mut buf = BytesMut::new(); + let msg = InputMessage::new_reply( + test_sender_tag(), + vec![6, 7, 8], + TransmissionLane::General, + None, + ); + codec.encode(msg, &mut buf).unwrap(); + let decoded = codec + .decode(&mut buf) + .unwrap() + .expect("Should decode message"); + + match decoded { + InputMessage::Reply { data, .. } => { + assert_eq!(data, vec![6, 7, 8]); + } + _ => panic!("Expected Reply variant"), + } + } + + { + let mut buf = BytesMut::new(); + let inner = InputMessage::new_anonymous( + test_recipient(), + vec![9, 10], + 2, + TransmissionLane::General, + None, + ); + let msg = InputMessage::new_wrapper(inner, PacketType::Mix); + codec.encode(msg, &mut buf).unwrap(); + let decoded = codec + .decode(&mut buf) + .unwrap() + .expect("Should decode message"); + + match decoded { + InputMessage::MessageWrapper { + message, + packet_type, + } => { + assert_eq!(packet_type, PacketType::Mix); + match *message { + InputMessage::Anonymous { + data, reply_surbs, .. + } => { + assert_eq!(data, vec![9, 10]); + assert_eq!(reply_surbs, 2); + } + _ => panic!("Expected Anonymous inner message"), + } + } + _ => panic!("Expected MessageWrapper variant"), + } + } + } + + #[test] + fn encode_decode_sequential_messages() { + let mut codec = InputMessageCodec; + let mut buf = BytesMut::new(); + + codec + .encode( + InputMessage::new_anonymous( + test_recipient(), + vec![1, 2, 3], + 1, + TransmissionLane::General, + None, + ), + &mut buf, + ) + .unwrap(); + + codec + .encode( + InputMessage::new_anonymous( + test_recipient(), + vec![4, 5, 6, 7], + 2, + TransmissionLane::General, + None, + ), + &mut buf, + ) + .unwrap(); + + codec + .encode( + InputMessage::new_anonymous( + test_recipient(), + vec![8, 9], + 3, + TransmissionLane::General, + None, + ), + &mut buf, + ) + .unwrap(); + + let decoded1 = codec + .decode(&mut buf) + .unwrap() + .expect("Should decode first message"); + match decoded1 { + InputMessage::Anonymous { + data, reply_surbs, .. + } => { + assert_eq!(data, vec![1, 2, 3]); + assert_eq!(reply_surbs, 1); + } + _ => panic!("Wrong variant"), + } + + let decoded2 = codec + .decode(&mut buf) + .unwrap() + .expect("Should decode second message"); + match decoded2 { + InputMessage::Anonymous { + data, reply_surbs, .. + } => { + assert_eq!(data, vec![4, 5, 6, 7]); + assert_eq!(reply_surbs, 2); + } + _ => panic!("Wrong variant"), + } + + let decoded3 = codec + .decode(&mut buf) + .unwrap() + .expect("Should decode third message"); + match decoded3 { + InputMessage::Anonymous { + data, reply_surbs, .. + } => { + assert_eq!(data, vec![8, 9]); + assert_eq!(reply_surbs, 3); + } + _ => panic!("Wrong variant"), + } + + // Buffer should be empty + let decoded4 = codec.decode(&mut buf).unwrap(); + assert!(decoded4.is_none(), "Should have no more messages"); + assert_eq!(buf.len(), 0, "Buffer should be empty"); + } + + #[test] + fn partial_message_handling() { + let mut codec = InputMessageCodec; + let mut buf = BytesMut::new(); + // Empty @ beginning + assert!(codec.decode(&mut buf).unwrap().is_none()); + + let mut buf = BytesMut::from(&[0x10, 0x00][..]); + assert!(codec.decode(&mut buf).unwrap().is_none()); + assert_eq!(buf.len(), 2, "Buffer should be unchanged"); + + let mut full_buf = BytesMut::new(); + codec + .encode( + InputMessage::new_anonymous( + test_recipient(), + vec![1, 2, 3, 4, 5], + 2, + TransmissionLane::General, + None, + ), + &mut full_buf, + ) + .unwrap(); + + // Only first half of the message + let partial_len = full_buf.len() / 2; + let mut partial_buf = full_buf.split_to(partial_len); + + assert!(codec.decode(&mut partial_buf).unwrap().is_none()); + assert_eq!(partial_buf.len(), partial_len, "Buffer should be unchanged"); + + partial_buf.unsplit(full_buf); + let decoded = codec.decode(&mut partial_buf).unwrap(); + assert!(decoded.is_some(), "Should decode complete message"); + match decoded.unwrap() { + InputMessage::Anonymous { data, .. } => { + assert_eq!(data, vec![1, 2, 3, 4, 5]); + } + _ => panic!("Expected Anonymous variant"), + } + } + + #[test] + fn addressed_codec_compatibility() { + let recipient = test_recipient(); + let data = b"test message payload"; + + let mut addressed_codec = AdressedInputMessageCodec(recipient); + let mut buf = BytesMut::new(); + addressed_codec.encode(data.as_ref(), &mut buf).unwrap(); + + let mut input_codec = InputMessageCodec; + let decoded = input_codec + .decode(&mut buf) + .unwrap() + .expect("Should decode"); + + match decoded { + InputMessage::Regular { + data: decoded_data, + recipient: decoded_recipient, + lane, + .. + } => { + assert_eq!(decoded_data, data, "Data should match"); + assert_eq!(decoded_recipient, recipient, "Recipient should match"); + assert_eq!(lane, TransmissionLane::General, "Should use General lane"); + } + _ => panic!("Expected Regular variant"), + } + } } diff --git a/common/client-core/src/client/statistics_control.rs b/common/client-core/src/client/statistics_control.rs index dcfbd2e19c5..fd3d8a48ecd 100644 --- a/common/client-core/src/client/statistics_control.rs +++ b/common/client-core/src/client/statistics_control.rs @@ -17,7 +17,7 @@ #![warn(clippy::dbg_macro)] use crate::client::inbound_messages::{InputMessage, InputMessageSender}; -use futures::StreamExt; +use futures::{SinkExt, StreamExt}; use nym_client_core_config_types::StatsReporting; use nym_sphinx::addressing::Recipient; use nym_statistics_common::clients::{ diff --git a/common/client-core/src/error.rs b/common/client-core/src/error.rs index 44eca8ac0bb..30344589be8 100644 --- a/common/client-core/src/error.rs +++ b/common/client-core/src/error.rs @@ -255,6 +255,12 @@ pub enum ClientCoreError { #[error("Could not access task registry, {0}")] RegistryAccess(#[from] RegistryAccessError), + + #[error("Serialization error: {0}")] + BincodeError(#[from] Box), + + #[error("Could not coarce to array")] + ArrayCreationFailure(#[from] std::array::TryFromSliceError), } impl From for ClientCoreError { diff --git a/common/client-core/src/lib.rs b/common/client-core/src/lib.rs index 1f2faf36597..898e0b880a6 100644 --- a/common/client-core/src/lib.rs +++ b/common/client-core/src/lib.rs @@ -1,3 +1,4 @@ +#![allow(deprecated)] // silences clippy warning: use of deprecated associated function `nym_crypto::generic_array::GenericArray::::clone_from_slice`: please upgrade to generic-array 1.x - TODO use std::future::Future; #[cfg(all( @@ -39,3 +40,10 @@ where { tokio::spawn(future); } + +fn make_bincode_serializer() -> impl bincode::Options { + use bincode::Options; + bincode::DefaultOptions::new() + .with_big_endian() + .with_varint_encoding() +} diff --git a/common/client-core/surb-storage/src/lib.rs b/common/client-core/surb-storage/src/lib.rs index 079c213bbc7..ffd7ad85f6f 100644 --- a/common/client-core/surb-storage/src/lib.rs +++ b/common/client-core/surb-storage/src/lib.rs @@ -1,6 +1,7 @@ // Copyright 2022 - Nym Technologies SA // SPDX-License-Identifier: Apache-2.0 +#![allow(deprecated)] // silences clippy warning: use of deprecated associated function `nym_crypto::generic_array::GenericArray::::from_exact_iter`: please upgrade to generic-array 1.x - TODO pub use backend::*; pub use combined::CombinedReplyStorage; pub use key_storage::SentReplyKeys; diff --git a/common/cosmwasm-smart-contracts/coconut-dkg/src/types.rs b/common/cosmwasm-smart-contracts/coconut-dkg/src/types.rs index 5cf5fd780e0..d7825da59fa 100644 --- a/common/cosmwasm-smart-contracts/coconut-dkg/src/types.rs +++ b/common/cosmwasm-smart-contracts/coconut-dkg/src/types.rs @@ -1,6 +1,8 @@ // Copyright 2022-2023 - Nym Technologies SA // SPDX-License-Identifier: Apache-2.0 +#![allow(clippy::derivable_impls)] +// MAX: surpressing warning for the moment, will be dealt with in a different PR (TODO) use cosmwasm_schema::cw_serde; use std::fmt::{Display, Formatter}; use std::str::FromStr; diff --git a/common/crypto/src/lib.rs b/common/crypto/src/lib.rs index 1dff7b82be0..e8426bb0854 100644 --- a/common/crypto/src/lib.rs +++ b/common/crypto/src/lib.rs @@ -1,6 +1,8 @@ // Copyright 2021 - Nym Technologies SA // SPDX-License-Identifier: Apache-2.0 +#![allow(deprecated)] // silences clippy warning: deprecated associated function `generic_array::GenericArray::::from_exact_iter`: please upgrade to generic-array 1.x - TODO + #[cfg(feature = "asymmetric")] pub mod asymmetric; pub mod bech32_address_validation; diff --git a/common/gateway-requests/src/lib.rs b/common/gateway-requests/src/lib.rs index b63b29dca1a..e577cb0e5a0 100644 --- a/common/gateway-requests/src/lib.rs +++ b/common/gateway-requests/src/lib.rs @@ -1,5 +1,6 @@ // Copyright 2020-2022 - Nym Technologies SA // SPDX-License-Identifier: Apache-2.0 +#![allow(deprecated)] // silences clippy warning: deprecated associated function `nym_crypto::generic_array::GenericArray::::clone_from_slice`: please upgrade to generic-array 1.x - TODO pub use nym_crypto::generic_array; use nym_crypto::OutputSizeUser; diff --git a/common/gateway-requests/src/types/binary_request.rs b/common/gateway-requests/src/types/binary_request.rs index a0e75385f94..d38d4f7d198 100644 --- a/common/gateway-requests/src/types/binary_request.rs +++ b/common/gateway-requests/src/types/binary_request.rs @@ -70,7 +70,7 @@ impl BinaryRequest { let plaintext = match self { BinaryRequest::ForwardSphinx { packet } => packet.into_v1_bytes()?, - BinaryRequest::ForwardSphinxV2 { packet } => packet.into_v2_bytes()?, + BinaryRequest::ForwardSphinxV2 { packet } => packet.to_v2_bytes()?, }; BinaryData::make_encrypted_blob(kind as u8, &plaintext, shared_key) diff --git a/common/http-api-client/src/lib.rs b/common/http-api-client/src/lib.rs index bc3a8d7430c..c9feb52d601 100644 --- a/common/http-api-client/src/lib.rs +++ b/common/http-api-client/src/lib.rs @@ -1,6 +1,9 @@ // Copyright 2023 - Nym Technologies SA // SPDX-License-Identifier: Apache-2.0 +#![allow(deprecated)] +// silences clippy warning: use of deprecated tuple variant `HttpClientError::GenericRequestFailure`: use another more strongly typed variant - this variant is only left for compatibility reasons - TODO + //! Nym HTTP API Client //! //! Centralizes and implements the core API client functionality. This crate provides custom, diff --git a/common/nym-connection-monitor/src/mixnet_beacon.rs b/common/nym-connection-monitor/src/mixnet_beacon.rs index 021519ef172..77e007092d6 100644 --- a/common/nym-connection-monitor/src/mixnet_beacon.rs +++ b/common/nym-connection-monitor/src/mixnet_beacon.rs @@ -28,7 +28,7 @@ impl MixnetConnectionBeacon { } } - async fn send_mixnet_self_ping(&self) -> Result { + async fn send_mixnet_self_ping(&mut self) -> Result { trace!("Sending mixnet self ping"); let (input_message, request_id) = create_self_ping(self.our_address); self.mixnet_client_sender @@ -38,7 +38,7 @@ impl MixnetConnectionBeacon { Ok(request_id) } - pub async fn run(self, shutdown: CancellationToken) -> Result<()> { + pub async fn run(mut self, shutdown: CancellationToken) -> Result<()> { debug!("Mixnet connection beacon is running"); let mut ping_interval = tokio::time::interval(MIXNET_SELF_PING_INTERVAL); loop { diff --git a/common/nym-connection-monitor/src/sync_self_ping.rs b/common/nym-connection-monitor/src/sync_self_ping.rs index 79ec9a10984..80402571022 100644 --- a/common/nym-connection-monitor/src/sync_self_ping.rs +++ b/common/nym-connection-monitor/src/sync_self_ping.rs @@ -3,7 +3,6 @@ use std::time::Duration; -use futures::StreamExt; use nym_sdk::mixnet::{MixnetClient, MixnetMessageSender, Recipient}; use tracing::{debug, error}; @@ -22,23 +21,22 @@ pub async fn self_ping_and_wait( wait_for_self_ping_return(mixnet_client, &request_ids).await } -async fn send_self_pings(our_address: Recipient, mixnet_client: &MixnetClient) -> Result> { - // Send pings - let request_ids = futures::stream::iter(1..=3) - .then(|_| async { - let (input_message, request_id) = create_self_ping(our_address); - mixnet_client - .send(input_message) - .await - .map_err(|err| Error::NymSdkError(Box::new(err)))?; - Ok::(request_id) - }) - .collect::>() - .await; +async fn send_self_pings( + our_address: Recipient, + mixnet_client: &mut MixnetClient, +) -> Result> { + let mut request_ids = Vec::with_capacity(3); + + for _ in 1..=3 { + let (input_message, request_id) = create_self_ping(our_address); + mixnet_client + .send(input_message) + .await + .map_err(|err| Error::NymSdkError(Box::new(err)))?; + request_ids.push(request_id); + } - // Check the vec of results and return the first error, if any. If there are not errors, unwrap - // all the results into a vec of u64s. - request_ids.into_iter().collect::>>() + Ok(request_ids) } async fn wait_for_self_ping_return( diff --git a/common/nymsphinx/Cargo.toml b/common/nymsphinx/Cargo.toml index d6b77f6969b..5c9dae785ad 100644 --- a/common/nymsphinx/Cargo.toml +++ b/common/nymsphinx/Cargo.toml @@ -13,6 +13,10 @@ rand = { workspace = true } rand_distr = { workspace = true } rand_chacha = { workspace = true } thiserror = { workspace = true } +serde = { workspace = true, features = ["derive"] } +bincode = { workspace = true } +log = { workspace = true } +tokio-util = { workspace = true, features = ["codec"] } nym-sphinx-acknowledgements = { path = "acknowledgements" } nym-sphinx-addressing = { path = "addressing" } @@ -47,11 +51,5 @@ features = ["sync"] [features] default = ["sphinx"] -sphinx = [ - "nym-sphinx-params/sphinx", - "nym-sphinx-types/sphinx", -] -outfox = [ - "nym-sphinx-params/outfox", - "nym-sphinx-types/outfox", -] +sphinx = ["nym-sphinx-params/sphinx", "nym-sphinx-types/sphinx"] +outfox = ["nym-sphinx-params/outfox", "nym-sphinx-types/outfox"] diff --git a/common/nymsphinx/acknowledgements/src/lib.rs b/common/nymsphinx/acknowledgements/src/lib.rs index 490e94eaea8..d37990d49f1 100644 --- a/common/nymsphinx/acknowledgements/src/lib.rs +++ b/common/nymsphinx/acknowledgements/src/lib.rs @@ -1,5 +1,6 @@ // Copyright 2021 - Nym Technologies SA // SPDX-License-Identifier: Apache-2.0 +#![allow(deprecated)] // silences clippy warning: deprecated associated function `nym_crypto::generic_array::GenericArray::::clone_from_slice`: please upgrade to generic-array 1.x - TODO pub mod identifier; pub mod key; diff --git a/common/nymsphinx/anonymous-replies/Cargo.toml b/common/nymsphinx/anonymous-replies/Cargo.toml index ef9c74b73da..278dbc60104 100644 --- a/common/nymsphinx/anonymous-replies/Cargo.toml +++ b/common/nymsphinx/anonymous-replies/Cargo.toml @@ -12,6 +12,7 @@ rand = { workspace = true } bs58 = { workspace = true } thiserror = { workspace = true } tracing = { workspace = true } +serde = { workspace = true } nym-crypto = { path = "../../crypto", features = ["stream_cipher", "rand"] } nym-sphinx-addressing = { path = "../addressing" } diff --git a/common/nymsphinx/anonymous-replies/src/lib.rs b/common/nymsphinx/anonymous-replies/src/lib.rs index 2c47a23a760..84bfa7a3231 100644 --- a/common/nymsphinx/anonymous-replies/src/lib.rs +++ b/common/nymsphinx/anonymous-replies/src/lib.rs @@ -1,6 +1,7 @@ // Copyright 2021 - Nym Technologies SA // SPDX-License-Identifier: Apache-2.0 +#![allow(deprecated)] // silences clippy warning: deprecated struct `nym_crypto::generic_array::GenericArray`: please upgrade to generic-array 1.x - TODO pub mod encryption_key; pub mod reply_surb; pub mod requests; diff --git a/common/nymsphinx/anonymous-replies/src/requests/mod.rs b/common/nymsphinx/anonymous-replies/src/requests/mod.rs index ecb2fc7ac50..b6916a20e8b 100644 --- a/common/nymsphinx/anonymous-replies/src/requests/mod.rs +++ b/common/nymsphinx/anonymous-replies/src/requests/mod.rs @@ -7,6 +7,7 @@ use crate::{ReplySurbError, ReplySurbWithKeyRotation}; use nym_sphinx_addressing::clients::{Recipient, RecipientFormattingError}; use nym_sphinx_params::key_rotation::InvalidSphinxKeyRotation; use rand::{CryptoRng, RngCore}; +use serde::{Deserialize, Serialize}; use std::fmt::{Display, Formatter}; use std::mem; use thiserror::Error; @@ -30,7 +31,7 @@ pub enum InvalidAnonymousSenderTagRepresentation { InvalidLength { received: usize, expected: usize }, } -#[derive(Debug, Copy, Clone, Eq, PartialEq, Hash)] +#[derive(Debug, Copy, Clone, Eq, PartialEq, Hash, Serialize, Deserialize)] #[cfg_attr(target_arch = "wasm32", wasm_bindgen)] pub struct AnonymousSenderTag([u8; SENDER_TAG_SIZE]); diff --git a/common/nymsphinx/forwarding/Cargo.toml b/common/nymsphinx/forwarding/Cargo.toml index c8beb33b12f..b2b25bab0c2 100644 --- a/common/nymsphinx/forwarding/Cargo.toml +++ b/common/nymsphinx/forwarding/Cargo.toml @@ -13,3 +13,4 @@ nym-sphinx-params = { path = "../params" } nym-sphinx-types = { path = "../types", features = ["sphinx", "outfox"] } nym-sphinx-anonymous-replies = { path = "../anonymous-replies" } thiserror = { workspace = true } +serde = { workspace = true } \ No newline at end of file diff --git a/common/nymsphinx/forwarding/src/packet.rs b/common/nymsphinx/forwarding/src/packet.rs index 9fb9f3a9155..32b8c893ee2 100644 --- a/common/nymsphinx/forwarding/src/packet.rs +++ b/common/nymsphinx/forwarding/src/packet.rs @@ -4,6 +4,11 @@ use nym_sphinx_addressing::nodes::{NymNodeRoutingAddress, NymNodeRoutingAddressError}; use nym_sphinx_params::{PacketSize, PacketType, SphinxKeyRotation}; use nym_sphinx_types::{NymPacket, NymPacketError}; +use serde::{ + Deserialize, Deserializer, Serialize, Serializer, + de::{self, Visitor}, +}; +use std::fmt; use nym_sphinx_anonymous_replies::reply_surb::AppliedReplySurb; use nym_sphinx_params::key_rotation::InvalidSphinxKeyRotation; @@ -174,7 +179,7 @@ impl MixPacket { }) } - pub fn into_v2_bytes(self) -> Result, MixPacketFormattingError> { + pub fn to_v2_bytes(&self) -> Result, MixPacketFormattingError> { Ok(std::iter::once(self.packet_type as u8) .chain(std::iter::once(self.key_rotation as u8)) .chain(self.next_hop.as_bytes()) @@ -183,4 +188,31 @@ impl MixPacket { } } +// MAX TODO implement for v1 as well for back compat? - this was added in the original asyncread/write work when we only had one v +impl Serialize for MixPacket { + fn serialize(&self, serializer: S) -> Result { + serializer.serialize_bytes(&self.to_v2_bytes().map_err(serde::ser::Error::custom)?) + } +} + +struct MixPacketVisitor; + +impl<'de> Visitor<'de> for MixPacketVisitor { + type Value = MixPacket; + + fn expecting(&self, formatter: &mut fmt::Formatter) -> fmt::Result { + formatter.write_str("a byte array representing a mix packet") + } + + fn visit_bytes(self, v: &[u8]) -> Result { + MixPacket::try_from_v2_bytes(v).map_err(serde::de::Error::custom) + } +} + +impl<'de> Deserialize<'de> for MixPacket { + fn deserialize>(deserializer: D) -> Result { + deserializer.deserialize_bytes(MixPacketVisitor) + } +} + // TODO: test for serialization and errors! diff --git a/common/nymsphinx/src/lib.rs b/common/nymsphinx/src/lib.rs index c27867635b1..7d725dfc875 100644 --- a/common/nymsphinx/src/lib.rs +++ b/common/nymsphinx/src/lib.rs @@ -22,3 +22,10 @@ pub use nym_sphinx_framing as framing; // TEMP UNTIL FURTHER REFACTORING pub use preparer::payload::NymPayloadBuilder; + +fn make_bincode_serializer() -> impl bincode::Options { + use bincode::Options; + bincode::DefaultOptions::new() + .with_big_endian() + .with_varint_encoding() +} diff --git a/common/nymsphinx/src/receiver.rs b/common/nymsphinx/src/receiver.rs index eb83ec43ea0..da38e7fb092 100644 --- a/common/nymsphinx/src/receiver.rs +++ b/common/nymsphinx/src/receiver.rs @@ -1,7 +1,11 @@ // Copyright 2021 - Nym Technologies SA // SPDX-License-Identifier: Apache-2.0 +use std::io; + +use crate::make_bincode_serializer; use crate::message::{NymMessage, NymMessageError, PaddedMessage, PlainMessage}; +use bincode::Options; use nym_crypto::aes::cipher::{KeyIvInit, StreamCipher}; use nym_crypto::asymmetric::x25519; use nym_crypto::shared_key::recompute_shared_key; @@ -15,10 +19,13 @@ use nym_sphinx_chunking::reconstruction::MessageReconstructor; use nym_sphinx_params::{ PacketEncryptionAlgorithm, PacketHkdfAlgorithm, ReplySurbEncryptionAlgorithm, }; +use serde::{Deserialize, Serialize}; use thiserror::Error; +use tokio_util::bytes::{Buf, BytesMut}; +use tokio_util::codec::{Decoder, Encoder}; // TODO: should this live in this file? -#[derive(Debug)] +#[derive(Debug, Serialize, Deserialize)] pub struct ReconstructedMessage { /// The actual plaintext message that was received. pub message: Vec, @@ -56,6 +63,50 @@ impl From for ReconstructedMessage { } } +pub struct ReconstructedMessageCodec; +const LENGHT_ENCODING_PREFIX_SIZE: usize = 4; + +impl Encoder for ReconstructedMessageCodec { + type Error = MessageRecoveryError; + + fn encode( + &mut self, + item: ReconstructedMessage, + buf: &mut BytesMut, + ) -> Result<(), Self::Error> { + let encoded = make_bincode_serializer().serialize(&item)?; + let encoded_len = encoded.len() as u32; + buf.reserve(LENGHT_ENCODING_PREFIX_SIZE + encoded.len()); + buf.extend_from_slice(&encoded_len.to_le_bytes()); + buf.extend_from_slice(&encoded); + Ok(()) + } +} + +impl Decoder for ReconstructedMessageCodec { + type Item = ReconstructedMessage; + type Error = MessageRecoveryError; + + fn decode(&mut self, buf: &mut BytesMut) -> Result, Self::Error> { + if buf.len() < LENGHT_ENCODING_PREFIX_SIZE { + return Ok(None); + } + + let len = u32::from_le_bytes(buf[0..LENGHT_ENCODING_PREFIX_SIZE].try_into()?) as usize; + + if buf.len() < len + LENGHT_ENCODING_PREFIX_SIZE { + return Ok(None); + } + + let decoded = make_bincode_serializer() + .deserialize(&buf[LENGHT_ENCODING_PREFIX_SIZE..len + LENGHT_ENCODING_PREFIX_SIZE])?; + + buf.advance(len + LENGHT_ENCODING_PREFIX_SIZE); + + Ok(Some(decoded)) + } +} + #[derive(Debug, Error)] pub enum MessageRecoveryError { #[error( @@ -75,6 +126,15 @@ pub enum MessageRecoveryError { #[error("Failed to recover message fragment - {0}")] FragmentRecoveryError(#[from] ChunkingError), + + #[error("Failed to recover message fragment - {0}")] + MessageRecoveryError(#[from] io::Error), + + #[error("Failed to serialize/deserialize message")] + SerializationError(#[from] Box), + + #[error("Invalid length prefix bytes")] + InvalidLengthPrefix(#[from] std::array::TryFromSliceError), } pub trait MessageReceiver { diff --git a/common/socks5-client-core/Cargo.toml b/common/socks5-client-core/Cargo.toml index ce8421f3d73..d5019e3c403 100644 --- a/common/socks5-client-core/Cargo.toml +++ b/common/socks5-client-core/Cargo.toml @@ -19,6 +19,7 @@ serde = { workspace = true, features = ["derive"] } # for config serialization/d tap = { workspace = true } thiserror = { workspace = true } tokio = { workspace = true, features = ["rt-multi-thread", "net", "signal"] } +tokio-util = { workspace = true } url = { workspace = true } nym-bandwidth-controller = { path = "../../common/bandwidth-controller" } diff --git a/common/socks5-client-core/src/socks/client.rs b/common/socks5-client-core/src/socks/client.rs index ffdca82a70f..868ced71b59 100644 --- a/common/socks5-client-core/src/socks/client.rs +++ b/common/socks5-client-core/src/socks/client.rs @@ -7,6 +7,7 @@ use super::{SocksVersion, RESERVED, SOCKS4_VERSION, SOCKS5_VERSION}; use crate::config; use futures::channel::mpsc; use futures::task::{Context, Poll}; +use futures::SinkExt; use log::*; use nym_client_core::client::inbound_messages::{InputMessage, InputMessageSender}; use nym_service_providers_common::interface::{ProviderInterfaceVersion, RequestVersion}; diff --git a/common/socks5/proxy-helpers/src/ordered_sender.rs b/common/socks5/proxy-helpers/src/ordered_sender.rs index d71f8435587..de30293eb04 100644 --- a/common/socks5/proxy-helpers/src/ordered_sender.rs +++ b/common/socks5/proxy-helpers/src/ordered_sender.rs @@ -3,11 +3,12 @@ use crate::proxy_runner::MixProxySender; use bytes::Bytes; +use futures::SinkExt; use log::{debug, error}; use nym_socks5_requests::{ConnectionId, SocketData}; use std::io; -pub(crate) struct OrderedMessageSender { +pub(crate) struct OrderedMessageSender { connection_id: ConnectionId, // addresses are provided for better logging local_destination_address: String, @@ -18,7 +19,7 @@ pub(crate) struct OrderedMessageSender { mix_message_adapter: F, } -impl OrderedMessageSender +impl OrderedMessageSender where F: Fn(SocketData) -> S, { @@ -55,7 +56,7 @@ where (self.mix_message_adapter)(data) } - async fn send_message(&self, message: S) { + async fn send_message(&mut self, message: S) { if self.mixnet_sender.send(message).await.is_err() { panic!("BatchRealMessageReceiver has stopped receiving!") } diff --git a/common/socks5/proxy-helpers/src/proxy_runner/inbound.rs b/common/socks5/proxy-helpers/src/proxy_runner/inbound.rs index 2e3087bbc02..d1b5653d6ea 100644 --- a/common/socks5/proxy-helpers/src/proxy_runner/inbound.rs +++ b/common/socks5/proxy-helpers/src/proxy_runner/inbound.rs @@ -74,7 +74,7 @@ async fn wait_for_lane( } } -pub(super) async fn run_inbound( +pub(super) async fn run_inbound( mut reader: OwnedReadHalf, mut message_sender: OrderedMessageSender, connection_id: ConnectionId, diff --git a/common/socks5/proxy-helpers/src/proxy_runner/mod.rs b/common/socks5/proxy-helpers/src/proxy_runner/mod.rs index 75a51e366a5..6da9fcd4664 100644 --- a/common/socks5/proxy-helpers/src/proxy_runner/mod.rs +++ b/common/socks5/proxy-helpers/src/proxy_runner/mod.rs @@ -9,6 +9,7 @@ use nym_task::ShutdownTracker; use std::fmt::Debug; use std::{sync::Arc, time::Duration}; use tokio::{net::TcpStream, sync::Notify}; +use tokio_util::sync::PollSender; mod inbound; mod outbound; @@ -35,7 +36,7 @@ impl From<(Vec, bool)> for ProxyMessage { } } -pub type MixProxySender = tokio::sync::mpsc::Sender; +pub type MixProxySender = PollSender; pub type MixProxyReader = tokio::sync::mpsc::Receiver; // TODO: when we finally get to implementing graceful shutdown, diff --git a/common/store-cipher/src/lib.rs b/common/store-cipher/src/lib.rs index 98d586151fc..5119021d166 100644 --- a/common/store-cipher/src/lib.rs +++ b/common/store-cipher/src/lib.rs @@ -1,6 +1,7 @@ // Copyright 2023 - Nym Technologies SA // SPDX-License-Identifier: Apache-2.0 +#![allow(deprecated)] use aes_gcm::aead::{Aead, Nonce}; use aes_gcm::{AeadCore, AeadInPlace, KeyInit}; use rand::{thread_rng, CryptoRng, Fill, RngCore}; diff --git a/common/task/Cargo.toml b/common/task/Cargo.toml index d96fcbe0686..dd163d65a20 100644 --- a/common/task/Cargo.toml +++ b/common/task/Cargo.toml @@ -15,6 +15,7 @@ thiserror = { workspace = true } tokio = { workspace = true, features = ["macros", "sync"] } tokio-util = { workspace = true, features = ["rt"] } tracing = { workspace = true } +serde = { workspace = true } [target."cfg(not(target_arch = \"wasm32\"))".dependencies.tokio] workspace = true @@ -39,4 +40,4 @@ tokio = { workspace = true, features = ["rt-multi-thread", "net", "signal", "tes nym-test-utils = { path = "../test-utils" } [lints] -workspace = true \ No newline at end of file +workspace = true diff --git a/common/task/src/connections.rs b/common/task/src/connections.rs index b0e8fc6ecca..d0e0c4c269a 100644 --- a/common/task/src/connections.rs +++ b/common/task/src/connections.rs @@ -2,13 +2,14 @@ // SPDX-License-Identifier: Apache-2.0 use futures::channel::mpsc; +use serde::{Deserialize, Serialize}; use std::collections::HashMap; // const LANE_CONSIDERED_CLEAR: usize = 10; pub type ConnectionId = u64; -#[derive(Copy, Clone, Debug, Hash, PartialEq, Eq)] +#[derive(Copy, Clone, Debug, Hash, PartialEq, Eq, Serialize, Deserialize)] pub enum TransmissionLane { General, // we need to treat surb-related requests and responses at higher priority diff --git a/gateway/src/node/internal_service_providers/authenticator/mixnet_listener.rs b/gateway/src/node/internal_service_providers/authenticator/mixnet_listener.rs index c05d9d8cd6f..de341a1c1cc 100644 --- a/gateway/src/node/internal_service_providers/authenticator/mixnet_listener.rs +++ b/gateway/src/node/internal_service_providers/authenticator/mixnet_listener.rs @@ -935,7 +935,7 @@ impl MixnetListener { // When an incoming mixnet message triggers a response that we send back. async fn handle_response( - &self, + &mut self, response: Vec, recipient: Option, sender_tag: Option, diff --git a/nym-gateway-probe/src/icmp.rs b/nym-gateway-probe/src/icmp.rs index ec757623f18..7c7cd4bde7b 100644 --- a/nym-gateway-probe/src/icmp.rs +++ b/nym-gateway-probe/src/icmp.rs @@ -20,7 +20,7 @@ pub fn icmp_identifier() -> u16 { } pub async fn send_ping_v4( - mixnet_client: &MixnetClient, + mixnet_client: &mut MixnetClient, our_ips: IpPair, sequence_number: u16, destination: Ipv4Addr, @@ -42,7 +42,7 @@ pub async fn send_ping_v4( } pub async fn send_ping_v6( - mixnet_client: &MixnetClient, + mixnet_client: &mut MixnetClient, our_ips: IpPair, sequence_number: u16, destination: Ipv6Addr, diff --git a/nym-gateway-probe/src/lib.rs b/nym-gateway-probe/src/lib.rs index 5f7827dff01..c7a25401243 100644 --- a/nym-gateway-probe/src/lib.rs +++ b/nym-gateway-probe/src/lib.rs @@ -776,7 +776,7 @@ async fn do_ping_exit( } async fn send_icmp_pings( - mixnet_client: &MixnetClient, + mixnet_client: &mut MixnetClient, our_ips: IpPair, exit_router_address: Recipient, ) -> anyhow::Result<()> { diff --git a/nym-ip-packet-client/Cargo.toml b/nym-ip-packet-client/Cargo.toml index 86b0e218302..8d5ce3e7493 100644 --- a/nym-ip-packet-client/Cargo.toml +++ b/nym-ip-packet-client/Cargo.toml @@ -15,10 +15,9 @@ workspace = true bincode.workspace = true bytes.workspace = true futures.workspace = true +nym-ip-packet-requests = { path = "../common/ip-packet-requests" } +nym-sdk = {path = "../sdk/rust/nym-sdk" } thiserror.workspace = true tokio-util.workspace = true tokio.workspace = true tracing.workspace = true - -nym-sdk = { path = "../sdk/rust/nym-sdk" } -nym-ip-packet-requests = { path = "../common/ip-packet-requests" } diff --git a/nym-ip-packet-client/src/connect.rs b/nym-ip-packet-client/src/connect.rs index bf679f67cf9..86d1b3ce4c5 100644 --- a/nym-ip-packet-client/src/connect.rs +++ b/nym-ip-packet-client/src/connect.rs @@ -30,8 +30,6 @@ enum ConnectionState { Disconnected, Connecting, Connected, - #[allow(unused)] - Disconnecting, } pub struct IprClientConnect { @@ -83,7 +81,7 @@ impl IprClientConnect { self.listen_for_connect_response(request_id).await } - async fn send_connect_request(&self, ip_packet_router_address: Recipient) -> Result { + async fn send_connect_request(&mut self, ip_packet_router_address: Recipient) -> Result { let (request, request_id) = IpPacketRequest::new_connect_request(None); // We use 20 surbs for the connect request because typically the IPR is configured to have diff --git a/nym-ip-packet-client/src/error.rs b/nym-ip-packet-client/src/error.rs index b88c332f674..f74ab688a49 100644 --- a/nym-ip-packet-client/src/error.rs +++ b/nym-ip-packet-client/src/error.rs @@ -18,6 +18,9 @@ pub enum Error { )] ReceivedResponseWithNewVersion { expected: u8, received: u8 }, + #[error("got reply for connect request, but it appears intended for the wrong address?")] + GotReplyIntendedForWrongAddress, + #[error("unexpected connect response")] UnexpectedConnectResponse, @@ -41,6 +44,10 @@ pub enum Error { #[error(transparent)] Bincode(#[from] bincode::Error), + #[error("failed to create connect request")] + FailedToCreateConnectRequest { + source: nym_ip_packet_requests::sign::SignatureError, + }, } // Result type based on our error type diff --git a/nym-network-monitor/src/handlers.rs b/nym-network-monitor/src/handlers.rs index 3b848fdcdbb..05f753cd89d 100644 --- a/nym-network-monitor/src/handlers.rs +++ b/nym-network-monitor/src/handlers.rs @@ -220,7 +220,7 @@ async fn send_receive_mixnet(state: AppState) -> Result { }); let send_handle = tokio::spawn(async move { - let mixnet_sender = sender.read().await.split_sender(); + let mut mixnet_sender = sender.read().await.split_sender(); let our_address = *sender.read().await.nym_address(); match timeout( Duration::from_secs(5), diff --git a/nym-outfox/src/lib.rs b/nym-outfox/src/lib.rs index e10d30e8465..0d278c06d72 100644 --- a/nym-outfox/src/lib.rs +++ b/nym-outfox/src/lib.rs @@ -1,3 +1,6 @@ +// MAX: temp ignore deprecated, can be dealt with in its own PR +#![allow(deprecated)] // silences clippy warning: deprecated associated function `chacha20::cipher::generic_array::GenericArray::::from_slice`: please upgrade to generic-array 1.x - TODO + pub mod constants; pub mod error; pub mod format; diff --git a/sdk/ffi/shared/src/lib.rs b/sdk/ffi/shared/src/lib.rs index eddd9b1f3e3..6f653156ddd 100644 --- a/sdk/ffi/shared/src/lib.rs +++ b/sdk/ffi/shared/src/lib.rs @@ -84,12 +84,12 @@ pub fn send_message_internal( message: &str, // TODO add Option, if Some(surb_amount) call send_message() instead with specified #, else send_plain_message as this uses the default ) -> Result<(), Error> { - let client = NYM_CLIENT.lock().expect("could not lock NYM_CLIENT"); + let mut client = NYM_CLIENT.lock().expect("could not lock NYM_CLIENT"); if client.is_none() { bail!("Client is not yet initialised"); } let nym_client = client - .as_ref() + .as_mut() .ok_or_else(|| anyhow!("could not get client as_ref()"))?; RUNTIME.block_on(async move { @@ -102,12 +102,12 @@ pub fn send_message_internal( // TODO send_raw_message_internal pub fn reply_internal(recipient: AnonymousSenderTag, message: &str) -> Result<(), Error> { - let client = NYM_CLIENT.lock().expect("could not lock NYM_CLIENT"); + let mut client = NYM_CLIENT.lock().expect("could not lock NYM_CLIENT"); if client.is_none() { bail!("Client is not yet initialised"); } let nym_client = client - .as_ref() + .as_mut() .ok_or_else(|| anyhow!("could not get client as_ref()"))?; RUNTIME.block_on(async move { diff --git a/sdk/rust/nym-sdk/Cargo.toml b/sdk/rust/nym-sdk/Cargo.toml index 523e5801c52..f218a4f74c3 100644 --- a/sdk/rust/nym-sdk/Cargo.toml +++ b/sdk/rust/nym-sdk/Cargo.toml @@ -15,13 +15,14 @@ name = "nym-proxy-client" path = "src/tcp_proxy/bin/proxy_client.rs" [dependencies] -async-trait = { workspace = true } -bip39 = { workspace = true } nym-client-core = { path = "../../../common/client-core", features = [ "fs-credentials-storage", "fs-surb-storage", "fs-gateways-storage", ] } +async-trait = { workspace = true } +bip39 = { workspace = true } + nym-crypto = { path = "../../../common/crypto" } nym-gateway-requests = { path = "../../../common/gateway-requests" } nym-bandwidth-controller = { path = "../../../common/bandwidth-controller" } @@ -46,6 +47,7 @@ nym-sphinx-addressing = { path = "../../../common/nymsphinx/addressing" } nym-bin-common = { path = "../../../common/bin-common", features = [ "basic_tracing", ] } + bytecodec = { workspace = true } httpcodec = { workspace = true } bytes = { workspace = true } @@ -61,6 +63,11 @@ url = { workspace = true } toml = { workspace = true } tempfile = { workspace = true } +nym-ip-packet-requests = { path = "../../../common/ip-packet-requests" } +pnet_packet = { workspace = true } +nym-config = { path = "../../../common/config" } + + # tcpproxy dependencies clap = { workspace = true, features = ["derive"] } anyhow.workspace = true diff --git a/sdk/rust/nym-sdk/examples/parallel_sending_and_receiving.rs b/sdk/rust/nym-sdk/examples/parallel_sending_and_receiving.rs index b42ad5ef048..38f7c526e6d 100644 --- a/sdk/rust/nym-sdk/examples/parallel_sending_and_receiving.rs +++ b/sdk/rust/nym-sdk/examples/parallel_sending_and_receiving.rs @@ -16,7 +16,7 @@ async fn main() { let our_address = *client.nym_address(); println!("Our client nym address is: {our_address}"); - let sender = client.split_sender(); + let mut sender = client.split_sender(); // receiving task let receiving_task_handle = tokio::spawn(async move { diff --git a/sdk/rust/nym-sdk/src/bandwidth/client.rs b/sdk/rust/nym-sdk/src/bandwidth/client.rs index e1ca5881856..fbe2eafbe66 100644 --- a/sdk/rust/nym-sdk/src/bandwidth/client.rs +++ b/sdk/rust/nym-sdk/src/bandwidth/client.rs @@ -29,6 +29,7 @@ where St: Storage + Clone, ::StorageError: Send + Sync + 'static, { + #[allow(clippy::result_large_err)] pub(crate) fn new( network_details: NymNetworkDetails, mnemonic: String, diff --git a/sdk/rust/nym-sdk/src/error.rs b/sdk/rust/nym-sdk/src/error.rs index af7706ec925..28b128daf1c 100644 --- a/sdk/rust/nym-sdk/src/error.rs +++ b/sdk/rust/nym-sdk/src/error.rs @@ -1,6 +1,9 @@ use nym_validator_client::nyxd::error::NyxdError; use std::path::PathBuf; +use nym_ip_packet_requests::v8::response::{ConnectFailureReason, IpPacketResponseData}; +use nym_validator_client::nym_api::error::NymAPIError; + /// Top-level Error enum for the mixnet client and its relevant types. #[derive(Debug, thiserror::Error)] pub enum Error { @@ -99,6 +102,63 @@ pub enum Error { #[error("Failed to get shutdown tracker from the task runtime registry: {0}")] RegistryAccess(#[from] nym_task::RegistryAccessError), + #[error("nymsphinx receiver error: {0}")] + MessageRecovery(#[from] nym_sphinx::receiver::MessageRecoveryError), + + #[error("client not connected")] + IprStreamClientNotConnected, + + #[error("client already connected or connecting")] + IprStreamClientAlreadyConnectedOrConnecting, + + #[error("trying to send an anonymous reply but peer surb tag is not set")] + MixStreamSurbTagNotSet, + + #[error("trying to send an outgoing message but receipient address is not set")] + MixStreamRecipientNotSet, + + #[error("listening for connection response timed out")] + IPRConnectResponseTimeout, + + #[error("no next frame: assuming stream is closed")] + IPRClientStreamClosed, + + #[error("expected control response, got {0:?}")] + UnexpectedResponseType(IpPacketResponseData), + + #[error("connect denied: {0:?}")] + ConnectDenied(ConnectFailureReason), + + #[allow(clippy::result_large_err)] + #[error("api directory error: {0}")] + GatewayDirectoryError(#[from] NymAPIError), + + #[error("did not receive Validator endpoint details")] + NoValidatorDetailsAvailable, + + #[error("did not receive URL")] + NoValidatorAPIUrl, + + #[error("did not receive NymVPN API URL")] + NoNymAPIUrl, + + #[error("no available gateway")] + NoGatewayAvailable, + + #[error("no IPR address on selected gateway")] + NoIPRAvailable, + + #[error("message version check failed: {0}")] + IPRMessageVersionCheckFailed(String), + + #[error("no response id found in connect response")] + IPRNoId, + + #[error("Could not find peer address or surb tag")] + MixStreamNoPeerOrSurb, + + #[error("No network env specified on new MixStream")] + MissingStreamConfig, } impl Error { diff --git a/sdk/rust/nym-sdk/src/mixnet/client.rs b/sdk/rust/nym-sdk/src/mixnet/client.rs index 5020c1bd5c8..a5a303ba25a 100644 --- a/sdk/rust/nym-sdk/src/mixnet/client.rs +++ b/sdk/rust/nym-sdk/src/mixnet/client.rs @@ -340,6 +340,7 @@ where } /// Construct a [`DisconnectedMixnetClient`] from the setup specified. + #[allow(clippy::result_large_err)] pub fn build(self) -> Result> { let mut client = DisconnectedMixnetClient::new( self.config, @@ -445,6 +446,7 @@ where /// Callers have the option of supplying further parameters to: /// - store persistent identities at a location on-disk, if desired; /// - use SOCKS5 mode + #[allow(clippy::result_large_err)] fn new( config: Config, socks5_config: Option, diff --git a/sdk/rust/nym-sdk/src/mixnet/native_client.rs b/sdk/rust/nym-sdk/src/mixnet/native_client.rs index d9d19008d1c..04dda121c36 100644 --- a/sdk/rust/nym-sdk/src/mixnet/native_client.rs +++ b/sdk/rust/nym-sdk/src/mixnet/native_client.rs @@ -2,9 +2,11 @@ use crate::mixnet::client::MixnetClientBuilder; use crate::mixnet::traits::MixnetMessageSender; use crate::{Error, Result}; use async_trait::async_trait; -use futures::{ready, Stream, StreamExt}; +use bytes::{Buf as _, BytesMut}; +use futures::{ready, FutureExt, Sink, SinkExt, Stream, StreamExt}; use log::{debug, error}; use nym_client_core::client::base_client::GatewayConnection; +use nym_client_core::client::inbound_messages::InputMessageCodec; use nym_client_core::client::mix_traffic::ClientRequestSender; use nym_client_core::client::{ base_client::{ClientInput, ClientOutput, ClientState}, @@ -15,15 +17,18 @@ use nym_client_core::config::{ForgetMe, RememberMe}; use nym_crypto::asymmetric::ed25519; use nym_gateway_requests::ClientRequest; use nym_sphinx::addressing::clients::Recipient; +use nym_sphinx::receiver::ReconstructedMessageCodec; use nym_sphinx::{params::PacketType, receiver::ReconstructedMessage}; use nym_statistics_common::clients::{ClientStatsEvents, ClientStatsSender}; use nym_task::connections::{ConnectionCommandSender, LaneQueueLengths}; use nym_task::ShutdownTracker; use nym_topology::{NymRouteProvider, NymTopology}; -use std::pin::Pin; +use std::pin::{pin, Pin}; use std::sync::Arc; use std::task::{Context, Poll}; +use tokio::io::{AsyncRead, AsyncWrite, ReadBuf}; use tokio::sync::RwLockReadGuard; +use tokio_util::codec::{Encoder, FramedRead}; use tokio_util::sync::CancellationToken; /// Client connected to the Nym mixnet. @@ -60,6 +65,24 @@ pub struct MixnetClient { _buffered: Vec, pub(crate) forget_me: ForgetMe, pub(crate) remember_me: RememberMe, + + // internal state used for the `AsyncRead` implementation + _read: ReadBuffer, +} + +#[derive(Debug, Default)] +struct ReadBuffer { + buffer: BytesMut, +} + +impl ReadBuffer { + fn clear(&mut self) { + self.buffer.clear(); + } + + fn pending(&self) -> bool { + !self.buffer.is_empty() + } } impl MixnetClient { @@ -90,6 +113,7 @@ impl MixnetClient { _buffered: Vec::new(), forget_me, remember_me, + _read: ReadBuffer::default(), } } @@ -280,6 +304,25 @@ impl MixnetClient { } } } + + fn read_buffer_to_slice( + &mut self, + buf: &mut ReadBuf, + cx: &mut Context<'_>, + ) -> Poll> { + if self._read.buffer.len() < buf.capacity() { + // let written = self._read.buffer.len(); + buf.put_slice(&self._read.buffer); + self._read.clear(); + Poll::Ready(Ok(())) + } else { + let written = buf.capacity(); + buf.put_slice(&self._read.buffer[..written]); + self._read.buffer.advance(written); + cx.waker().wake_by_ref(); + Poll::Ready(Ok(())) + } + } } #[derive(Clone)] @@ -288,6 +331,193 @@ pub struct MixnetClientSender { packet_type: Option, } +impl AsyncRead for MixnetClient { + fn poll_read( + mut self: Pin<&mut Self>, + cx: &mut Context<'_>, + buf: &mut ReadBuf, + ) -> Poll> { + let mut codec = ReconstructedMessageCodec {}; + + if self._read.pending() { + return self.read_buffer_to_slice(buf, cx); + } + + let msg = match self.as_mut().poll_next(cx) { + Poll::Ready(Some(msg)) => msg, + Poll::Ready(None) => return Poll::Ready(Ok(())), + Poll::Pending => return Poll::Pending, + }; + + match codec.encode(msg, &mut self._read.buffer) { + Ok(_) => {} + Err(e) => { + error!("failed to encode reconstructed message: {:?}", e); + return Poll::Ready(Err(tokio::io::Error::other( + "failed to encode reconstructed message", + ))); + } + }; + + self.read_buffer_to_slice(buf, cx) + } +} + +impl AsyncWrite for MixnetClient { + fn poll_write( + mut self: Pin<&mut Self>, + cx: &mut Context<'_>, + buf: &[u8], + ) -> Poll> { + let codec = InputMessageCodec {}; + let mut reader = FramedRead::new(buf, codec); + let mut fut = reader.next(); + let msg = match fut.poll_unpin(cx) { + Poll::Ready(Some(Ok(msg))) => msg, + Poll::Ready(Some(Err(_))) => { + return Poll::Ready(Err(std::io::Error::other( + "failed to read message from input", + ))) + } + Poll::Pending => return Poll::Pending, + Poll::Ready(None) => return Poll::Ready(Ok(0)), + }; + + let msg_size = msg.serialized_size(); + + let mut fut = pin!(self.client_input.send(msg)); + match fut.poll_unpin(cx) { + Poll::Ready(Ok(())) => Poll::Ready(Ok(msg_size as usize)), + Poll::Ready(Err(_)) => Poll::Ready(Err(std::io::Error::other( + "failed to send message to mixnet", + ))), + Poll::Pending => Poll::Pending, + } + } + + fn poll_flush( + self: Pin<&mut Self>, + cx: &mut Context<'_>, + ) -> Poll> { + Sink::poll_flush(self, cx).map_err(|_| std::io::Error::other("failed to flush the sink")) + } + + fn poll_shutdown( + self: Pin<&mut Self>, + cx: &mut Context<'_>, + ) -> Poll> { + AsyncWrite::poll_flush(self, cx) + } +} + +impl Sink for MixnetClient { + type Error = Error; + + fn poll_ready(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + match self.sender().poll_ready_unpin(cx) { + Poll::Ready(Ok(())) => Poll::Ready(Ok(())), + Poll::Ready(Err(_)) => Poll::Ready(Err(Error::MessageSendingFailure)), + Poll::Pending => Poll::Pending, + } + } + + fn start_send(mut self: Pin<&mut Self>, item: InputMessage) -> Result<()> { + self.sender() + .start_send_unpin(item) + .map_err(|_| Error::MessageSendingFailure) + } + + fn poll_flush(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + self.sender() + .poll_flush_unpin(cx) + .map_err(|_| Error::MessageSendingFailure) + } + + fn poll_close(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + self.sender() + .poll_close_unpin(cx) + .map_err(|_| Error::MessageSendingFailure) + } +} + +// TODO: there should be a better way of implementing Sink and AsyncWrite over T: MixnetMessageSender +impl AsyncWrite for MixnetClientSender { + fn poll_write( + mut self: Pin<&mut Self>, + cx: &mut Context<'_>, + buf: &[u8], + ) -> Poll> { + let codec = InputMessageCodec {}; + let mut reader = FramedRead::new(buf, codec); + let mut fut = reader.next(); + let msg = match fut.poll_unpin(cx) { + Poll::Ready(Some(Ok(msg))) => msg, + Poll::Ready(Some(Err(_))) => { + return Poll::Ready(Err(std::io::Error::other( + "failed to read message from input", + ))) + } + Poll::Pending => return Poll::Pending, + Poll::Ready(None) => return Poll::Ready(Ok(0)), + }; + + let msg_size = msg.serialized_size(); + + let mut fut = pin!(self.client_input.send(msg)); + match fut.poll_unpin(cx) { + Poll::Ready(Ok(())) => Poll::Ready(Ok(msg_size as usize)), + Poll::Ready(Err(_)) => Poll::Ready(Err(std::io::Error::other( + "failed to send message to mixnet", + ))), + Poll::Pending => Poll::Pending, + } + } + + fn poll_flush( + self: Pin<&mut Self>, + cx: &mut Context<'_>, + ) -> Poll> { + Sink::poll_flush(self, cx).map_err(|_| std::io::Error::other("failed to flush the sink")) + } + + fn poll_shutdown( + self: Pin<&mut Self>, + cx: &mut Context<'_>, + ) -> Poll> { + AsyncWrite::poll_flush(self, cx) + } +} + +impl Sink for MixnetClientSender { + type Error = Error; + + fn poll_ready(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + match self.sender().poll_ready_unpin(cx) { + Poll::Ready(Ok(())) => Poll::Ready(Ok(())), + Poll::Ready(Err(_)) => Poll::Ready(Err(Error::MessageSendingFailure)), + Poll::Pending => Poll::Pending, + } + } + + fn start_send(mut self: Pin<&mut Self>, item: InputMessage) -> Result<()> { + self.sender() + .start_send_unpin(item) + .map_err(|_| Error::MessageSendingFailure) + } + + fn poll_flush(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + self.sender() + .poll_flush_unpin(cx) + .map_err(|_| Error::MessageSendingFailure) + } + + fn poll_close(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + self.sender() + .poll_close_unpin(cx) + .map_err(|_| Error::MessageSendingFailure) + } +} + impl Stream for MixnetClient { type Item = ReconstructedMessage; @@ -326,12 +556,16 @@ impl MixnetMessageSender for MixnetClient { self.packet_type } - async fn send(&self, message: InputMessage) -> Result<()> { + async fn send(&mut self, message: InputMessage) -> Result<()> { self.client_input .send(message) .await .map_err(|_| Error::MessageSendingFailure) } + + fn sender(&mut self) -> &mut tokio_util::sync::PollSender { + &mut self.client_input.input_sender + } } #[async_trait] @@ -340,10 +574,14 @@ impl MixnetMessageSender for MixnetClientSender { self.packet_type } - async fn send(&self, message: InputMessage) -> Result<()> { + async fn send(&mut self, message: InputMessage) -> Result<()> { self.client_input .send(message) .await .map_err(|_| Error::MessageSendingFailure) } + + fn sender(&mut self) -> &mut tokio_util::sync::PollSender { + &mut self.client_input.input_sender + } } diff --git a/sdk/rust/nym-sdk/src/mixnet/paths.rs b/sdk/rust/nym-sdk/src/mixnet/paths.rs index 5f8d60a7de2..adcb5b91585 100644 --- a/sdk/rust/nym-sdk/src/mixnet/paths.rs +++ b/sdk/rust/nym-sdk/src/mixnet/paths.rs @@ -53,6 +53,7 @@ impl StoragePaths { /// /// This function will return an error if it is passed a path to an existing file instead of a /// directory. + #[allow(clippy::result_large_err)] pub fn new_from_dir>(dir: P) -> Result { let dir = dir.as_ref(); if dir.is_file() { diff --git a/sdk/rust/nym-sdk/src/mixnet/sink.rs b/sdk/rust/nym-sdk/src/mixnet/sink.rs index a3b8fd8adab..9bf40a8aef6 100644 --- a/sdk/rust/nym-sdk/src/mixnet/sink.rs +++ b/sdk/rust/nym-sdk/src/mixnet/sink.rs @@ -26,6 +26,7 @@ const SINK_BUFFER_SIZE_IN_MESSAGES: usize = 8; /// Traits that represents the ability to convert bytes into InputMessages that can be sent to the /// mixnet. This is typically used to set the destination and other sending parameters. pub trait MixnetMessageSinkTranslator: Unpin { + #[allow(clippy::result_large_err)] fn to_input_message(&self, bytes: &[u8]) -> Result; } @@ -175,7 +176,7 @@ where } fn start_sender_task( - mixnet_client_sender: Sender, + mut mixnet_client_sender: Sender, ) -> (mpsc::Sender, JoinHandle<()>) where Sender: MixnetMessageSender + Send + 'static, diff --git a/sdk/rust/nym-sdk/src/mixnet/traits.rs b/sdk/rust/nym-sdk/src/mixnet/traits.rs index 8eb78109f7a..d8649dfa7b2 100644 --- a/sdk/rust/nym-sdk/src/mixnet/traits.rs +++ b/sdk/rust/nym-sdk/src/mixnet/traits.rs @@ -16,9 +16,11 @@ pub trait MixnetMessageSender { None } + fn sender(&mut self) -> &mut tokio_util::sync::PollSender; + /// Sends a [`InputMessage`] to the mixnet. This is the most low-level sending function, for /// full customization. - async fn send(&self, message: InputMessage) -> Result<()>; + async fn send(&mut self, message: InputMessage) -> Result<()>; /// Sends data to the supplied Nym address with the default surb behaviour. /// @@ -35,7 +37,7 @@ pub trait MixnetMessageSender { /// client.send_plain_message(recipient, "hi").await.unwrap(); /// } /// ``` - async fn send_plain_message(&self, address: Recipient, message: M) -> Result<()> + async fn send_plain_message(&mut self, address: Recipient, message: M) -> Result<()> where M: AsRef<[u8]> + Send, { @@ -61,7 +63,7 @@ pub trait MixnetMessageSender { /// } /// ``` async fn send_message( - &self, + &mut self, address: Recipient, message: M, surbs: IncludedSurbs, @@ -103,7 +105,7 @@ pub trait MixnetMessageSender { /// client.send_reply(tag, b"hi").await.unwrap(); /// } /// ``` - async fn send_reply(&self, recipient_tag: AnonymousSenderTag, message: M) -> Result<()> + async fn send_reply(&mut self, recipient_tag: AnonymousSenderTag, message: M) -> Result<()> where M: AsRef<[u8]> + Send, { diff --git a/sdk/rust/nym-sdk/src/tcp_proxy.rs b/sdk/rust/nym-sdk/src/tcp_proxy.rs index a622e3f6cf1..ddc843adaee 100644 --- a/sdk/rust/nym-sdk/src/tcp_proxy.rs +++ b/sdk/rust/nym-sdk/src/tcp_proxy.rs @@ -163,7 +163,7 @@ //! let codec = BytesCodec::new(); //! let mut framed_read = FramedRead::new(read, codec); //! // Much like the tcpstream, split our Nym client into a sender and receiver for concurrent read/write -//! let sender = client.split_sender(); +//! let mut sender = client.split_sender(); //! // The server / service provider address our client is sending messages to will remain static //! let server_addr = server_address; //! // Store outgoing messages in instance of Dashset abstraction diff --git a/sdk/rust/nym-sdk/src/tcp_proxy/tcp_proxy_client.rs b/sdk/rust/nym-sdk/src/tcp_proxy/tcp_proxy_client.rs index f00a5b30c14..4721f4e4cdc 100644 --- a/sdk/rust/nym-sdk/src/tcp_proxy/tcp_proxy_client.rs +++ b/sdk/rust/nym-sdk/src/tcp_proxy/tcp_proxy_client.rs @@ -161,7 +161,7 @@ impl NymProxyClient { let codec = BytesCodec::new(); let mut framed_read = FramedRead::new(read, codec); // Much like the tcpstream, split our Nym client into a sender and receiver for concurrent read/write - let sender = client.split_sender(); + let mut sender = client.split_sender(); // The server / service provider address our client is sending messages to will remain static let server_addr = server_address; // Store outgoing messages in instance of Dashset abstraction diff --git a/sdk/rust/nym-sdk/src/tcp_proxy/tcp_proxy_server.rs b/sdk/rust/nym-sdk/src/tcp_proxy/tcp_proxy_server.rs index a45ca513632..61f2d232d48 100644 --- a/sdk/rust/nym-sdk/src/tcp_proxy/tcp_proxy_server.rs +++ b/sdk/rust/nym-sdk/src/tcp_proxy/tcp_proxy_server.rs @@ -217,7 +217,7 @@ impl NymProxyServer { sender .write() .await - .send_reply(surb, bincode::serialize(&reply)?) + .send_reply(surb, &bincode::serialize(&reply)?) .await? } info!( diff --git a/service-providers/ip-packet-router/src/clients/connected_client_handler.rs b/service-providers/ip-packet-router/src/clients/connected_client_handler.rs index ba9f9b32901..d047819d315 100644 --- a/service-providers/ip-packet-router/src/clients/connected_client_handler.rs +++ b/service-providers/ip-packet-router/src/clients/connected_client_handler.rs @@ -219,14 +219,14 @@ impl MixnetMessageSinkTranslator for ToIprDataResponse { #[cfg(test)] mod tests { - use std::sync::{Arc, Mutex}; - use async_trait::async_trait; use bytes::Bytes; use futures::SinkExt; use nym_sdk::mixnet::{AnonymousSenderTag, MixnetMessageSender, MixnetMessageSink}; + use std::sync::{Arc, Mutex}; use tokio::sync::Notify; use tokio_util::codec::FramedWrite; + use tokio_util::sync::PollSender; use super::*; @@ -264,12 +264,15 @@ mod tests { #[async_trait] impl MixnetMessageSender for MockMixnetClientSender { - async fn send(&self, message: InputMessage) -> std::result::Result<(), nym_sdk::Error> { + async fn send(&mut self, message: InputMessage) -> std::result::Result<(), nym_sdk::Error> { let mut sent_messages = self.sent_messages.lock().unwrap(); sent_messages.push(message); self.notify.notify_one(); Ok(()) } + fn sender(&mut self) -> &mut PollSender { + todo!() + } } #[tokio::test] diff --git a/service-providers/ip-packet-router/src/mixnet_listener.rs b/service-providers/ip-packet-router/src/mixnet_listener.rs index c0f36927bcf..e9ee8087b3e 100644 --- a/service-providers/ip-packet-router/src/mixnet_listener.rs +++ b/service-providers/ip-packet-router/src/mixnet_listener.rs @@ -430,7 +430,7 @@ impl MixnetListener { // When an incoming mixnet message triggers a response that we send back, such as during // connect handshake. - async fn handle_response(&self, response: VersionedResponse) -> Result<()> { + async fn handle_response(&mut self, response: VersionedResponse) -> Result<()> { let send_to = response.reply_to.clone(); let response_bytes = response.try_into_bytes()?; let input_message = @@ -445,7 +445,7 @@ impl MixnetListener { // A single incoming request can trigger multiple responses, such as when data requests contain // multiple IP packets. - async fn handle_responses(&self, responses: Vec) { + async fn handle_responses(&mut self, responses: Vec) { for response in responses { match response { Ok(Some(response)) => { diff --git a/service-providers/network-requester/Cargo.toml b/service-providers/network-requester/Cargo.toml index 1731d38bef2..53b99a4de60 100644 --- a/service-providers/network-requester/Cargo.toml +++ b/service-providers/network-requester/Cargo.toml @@ -37,14 +37,23 @@ tap = { workspace = true } thiserror = { workspace = true } tokio = { workspace = true, features = ["net", "rt-multi-thread", "macros"] } tokio-tungstenite = { workspace = true } +tokio-util = { workspace = true } url = { workspace = true } time = { workspace = true } zeroize = { workspace = true } # internal + + nym-async-file-watcher = { path = "../../common/async-file-watcher" } -nym-bin-common = { path = "../../common/bin-common", features = ["output_format", "clap", "basic_tracing"] } -nym-client-core = { path = "../../common/client-core", features = ["cli", "fs-gateways-storage", "fs-surb-storage"] } +nym-client-core = { path = "../../common/client-core", features = [ + "cli", + "fs-gateways-storage", + "fs-surb-storage", +] } +nym-bin-common = { path = "../../common/bin-common", features = [ + "output_format", +] } nym-client-websocket-requests = { path = "../../clients/native/websocket-requests" } nym-config = { path = "../../common/config" } nym-credentials = { path = "../../common/credentials" } diff --git a/service-providers/network-requester/src/core.rs b/service-providers/network-requester/src/core.rs index 6ca9b3b588c..9b6cf866552 100644 --- a/service-providers/network-requester/src/core.rs +++ b/service-providers/network-requester/src/core.rs @@ -7,6 +7,7 @@ use crate::reply::MixnetMessage; use crate::request_filter::RequestFilter; use crate::{reply, socks5}; use async_trait::async_trait; +use futures::SinkExt; use futures::channel::{mpsc, oneshot}; use futures::stream::StreamExt; use log::{debug, error, warn}; @@ -15,7 +16,7 @@ use nym_client_core::HardcodedTopologyProvider; use nym_client_core::client::mix_traffic::transceiver::GatewayTransceiver; use nym_client_core::config::disk_persistence::CommonClientPaths; use nym_network_defaults::NymNetworkDetails; -use nym_sdk::mixnet::{MixnetMessageSender, TopologyProvider}; +use nym_sdk::mixnet::TopologyProvider; use nym_service_providers_common::ServiceProvider; use nym_service_providers_common::interface::{ BinaryInformation, ProviderInterfaceVersion, Request, RequestVersion, @@ -37,6 +38,7 @@ use nym_task::ShutdownTracker; use nym_task::connections::LaneQueueLengths; use std::path::Path; use std::sync::atomic::{AtomicUsize, Ordering}; +use tokio_util::sync::PollSender; // Since it's an atomic, it's safe to be kept static and shared across threads static ACTIVE_PROXIES: AtomicUsize = AtomicUsize::new(0); @@ -240,6 +242,8 @@ impl NRServiceProviderBuilder { // going to be used by `mixnet_response_listener` let (mix_input_sender, mix_input_receiver) = tokio::sync::mpsc::channel::(1); + let mix_input_sender = PollSender::new(mix_input_sender); + // Controller for managing all active connections. let (mut active_connections_controller, controller_sender) = Controller::new( mixnet_client.connection_command_sender(), @@ -337,7 +341,7 @@ impl NRServiceProvider { /// Listens for any messages from `mix_reader` that should be written back to the mix network /// via the `websocket_writer`. async fn mixnet_response_listener( - mixnet_client_sender: nym_sdk::mixnet::MixnetClientSender, + mut mixnet_client_sender: nym_sdk::mixnet::MixnetClientSender, mut mix_input_reader: MixProxyReader, packet_type: PacketType, ) { @@ -346,7 +350,7 @@ impl NRServiceProvider { socks5_msg = mix_input_reader.recv() => { if let Some(msg) = socks5_msg { let response_message = msg.into_input_message(packet_type); - mixnet_client_sender.send(response_message).await.unwrap(); + nym_sdk::mixnet::MixnetMessageSender::send(&mut mixnet_client_sender, response_message).await.unwrap(); } else { log::error!("Exiting: channel closed!"); break; @@ -364,7 +368,7 @@ impl NRServiceProvider { return_address: reply::MixnetAddress, biggest_packet_size: PacketSize, controller_sender: ControllerSender, - mix_input_sender: MixProxySender, + mut mix_input_sender: MixProxySender, lane_queue_lengths: LaneQueueLengths, shutdown: ShutdownTracker, ) { @@ -457,7 +461,7 @@ impl NRServiceProvider { .unwrap_or(traffic_config.primary_packet_size); let controller_sender_clone = self.controller_sender.clone(); - let mix_input_sender_clone = self.mix_input_sender.clone(); + let mut mix_input_sender_clone = self.mix_input_sender.clone(); let lane_queue_lengths_clone = self.mixnet_client.shared_lane_queue_lengths(); // we're just cloning the underlying pointer, nothing expensive is happening here diff --git a/tools/echo-server/src/lib.rs b/tools/echo-server/src/lib.rs index 4759f6086ac..801164e699c 100644 --- a/tools/echo-server/src/lib.rs +++ b/tools/echo-server/src/lib.rs @@ -344,7 +344,7 @@ mod tests { let mut client = MixnetClient::connect_new().await?; println!("sending client addr {}", client.nym_address()); - let sender = client.split_sender(); + let mut sender = client.split_sender(); let receiving_task_handle = tokio::spawn(async move { println!("in handle"); diff --git a/wasm/client/Cargo.toml b/wasm/client/Cargo.toml index 4b5d7d19e90..66d19dc2878 100644 --- a/wasm/client/Cargo.toml +++ b/wasm/client/Cargo.toml @@ -31,6 +31,7 @@ once_cell = { workspace = true } thiserror = { workspace = true } tsify = { workspace = true, features = ["js"] } web-sys = { workspace = true } +tokio = { workspace = true, default-features = false, features = ["sync"] } nym-bin-common = { path = "../../common/bin-common" } wasm-client-core = { path = "../../common/wasm/client-core" } diff --git a/wasm/client/src/client.rs b/wasm/client/src/client.rs index 8c006b7f09f..4d886b7cee3 100644 --- a/wasm/client/src/client.rs +++ b/wasm/client/src/client.rs @@ -16,7 +16,7 @@ use nym_bin_common::bin_info; use nym_gateway_requests::ClientRequest; use serde::{Deserialize, Serialize}; use std::sync::Arc; -use tokio_with_wasm::sync::mpsc; +use tokio_with_wasm::sync::{mpsc, RwLock}; use tsify::Tsify; use wasm_bindgen::prelude::*; use wasm_bindgen_futures::future_to_promise; @@ -54,7 +54,7 @@ pub type ClientRequestSender = mpsc::Sender; #[wasm_bindgen] pub struct NymClient { self_address: String, - client_input: Arc, + client_input: Arc>, client_state: Arc, // keep track of the "old" topology for the purposes of node tester @@ -253,7 +253,7 @@ impl NymClientBuilder { Ok(NymClient { self_address, - client_input: Arc::new(client_input), + client_input: Arc::new(RwLock::new(client_input)), client_state: Arc::new(started_client.client_state), _full_topology: None, _task_manager: started_client.shutdown_handle, diff --git a/wasm/client/src/helpers.rs b/wasm/client/src/helpers.rs index b6cdba3d33c..eb98970438b 100644 --- a/wasm/client/src/helpers.rs +++ b/wasm/client/src/helpers.rs @@ -1,8 +1,10 @@ // Copyright 2022-2023 - Nym Technologies SA // SPDX-License-Identifier: Apache-2.0 +use futures::SinkExt; use js_sys::Promise; use std::sync::Arc; +use tokio_with_wasm::sync::RwLock; use wasm_bindgen::JsValue; use wasm_bindgen_futures::future_to_promise; use wasm_client_core::client::base_client::{ClientInput, ClientState}; @@ -48,10 +50,11 @@ pub(crate) trait InputSender { fn send_messages(&self, messages: Vec) -> Promise; } -impl InputSender for Arc { +impl InputSender for Arc> { fn send_message(&self, message: InputMessage) -> Promise { let this = Arc::clone(self); future_to_promise(async move { + let mut this = this.write().await; match this.input_sender.send(message).await { Ok(_) => Ok(JsValue::null()), Err(_) => Err(simple_js_error( @@ -64,6 +67,7 @@ impl InputSender for Arc { fn send_messages(&self, messages: Vec) -> Promise { let this = Arc::clone(self); future_to_promise(async move { + let mut this = this.write().await; for message in messages { if this.input_sender.send(message).await.is_err() { return Err(simple_js_error( diff --git a/wasm/mix-fetch/src/client.rs b/wasm/mix-fetch/src/client.rs index 736c82c4139..60a51f03c6c 100644 --- a/wasm/mix-fetch/src/client.rs +++ b/wasm/mix-fetch/src/client.rs @@ -8,11 +8,14 @@ use crate::go_bridge::goWasmSetMixFetchRequestTimeout; use crate::request_writer::RequestWriter; use crate::socks_helpers::{socks5_connect_request, socks5_data_request}; use crate::{config, RequestId}; +use futures::SinkExt; use js_sys::Promise; use nym_bin_common::bin_info; use nym_socks5_requests::RemoteAddress; use std::sync::atomic::{AtomicBool, Ordering}; +use std::sync::Arc; use tokio::sync::Mutex; +use tokio::sync::RwLock; use wasm_bindgen::prelude::*; use wasm_bindgen_futures::future_to_promise; use wasm_client_core::client::base_client::storage::GatewaysDetailsStore; @@ -36,7 +39,7 @@ pub struct MixFetchClient { self_address: Recipient, - client_input: ClientInput, + client_input: Arc>, requests: ActiveRequests, @@ -185,7 +188,7 @@ impl MixFetchClientBuilder { invalidated: AtomicBool::new(false), mix_fetch_config: self.config.mix_fetch, self_address, - client_input, + client_input: Arc::new(RwLock::new(client_input)), requests: active_requests, _shutdown_manager: Mutex::new(started_client.shutdown_handle), }) @@ -261,6 +264,8 @@ impl MixFetchClient { // the expect here is fine as it implies an unrecoverable failure since one of the client core // tasks has terminated self.client_input + .write() + .await .input_sender .send(input) .await @@ -288,6 +293,8 @@ impl MixFetchClient { // the expect here is fine as it implies an unrecoverable failure since one of the client core // tasks has terminated self.client_input + .write() + .await .input_sender .send(input) .await