diff --git a/Cargo.lock b/Cargo.lock index 9974f92fa7e..f21e6b9cc9b 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -4138,6 +4138,7 @@ version = "1.1.15" dependencies = [ "async-trait", "base64 0.21.7", + "bincode", "bs58 0.5.1", "cfg-if", "clap 4.5.4", @@ -4180,6 +4181,7 @@ dependencies = [ "tokio", "tokio-stream", "tokio-tungstenite", + "tokio-util", "tungstenite", "url", "wasm-bindgen", @@ -4253,6 +4255,7 @@ dependencies = [ "serde-wasm-bindgen 0.6.5", "serde_json", "thiserror", + "tokio", "tsify", "wasm-bindgen", "wasm-bindgen-futures", @@ -4956,6 +4959,7 @@ dependencies = [ "time", "tokio", "tokio-tungstenite", + "tokio-util", "url", "zeroize", ] @@ -5300,6 +5304,7 @@ dependencies = [ "tap", "thiserror", "tokio", + "tokio-util", "url", ] @@ -5359,6 +5364,7 @@ dependencies = [ name = "nym-sphinx" version = "0.1.0" dependencies = [ + "bincode", "log", "nym-crypto", "nym-mixnet-contract-common", @@ -5375,8 +5381,10 @@ dependencies = [ "nym-topology", "rand 0.8.5", "rand_distr", + "serde", "thiserror", "tokio", + "tokio-util", ] [[package]] @@ -5463,6 +5471,7 @@ dependencies = [ "nym-sphinx-addressing", "nym-sphinx-params", "nym-sphinx-types", + "serde", "thiserror", ] @@ -5540,6 +5549,7 @@ version = "0.1.0" dependencies = [ "futures", "log", + "serde", "thiserror", "tokio", "wasm-bindgen", diff --git a/common/client-core/Cargo.toml b/common/client-core/Cargo.toml index 6462e98154f..a3c877be83f 100644 --- a/common/client-core/Cargo.toml +++ b/common/client-core/Cargo.toml @@ -10,6 +10,7 @@ license.workspace = true [dependencies] async-trait = { workspace = true } +bincode = { workspace = true } base64 = "0.21.2" bs58 = { workspace = true } cfg-if = { workspace = true } @@ -26,6 +27,7 @@ tap = { workspace = true } thiserror = { workspace = true } url = { workspace = true, features = ["serde"] } tokio = { workspace = true, features = ["macros"] } +tokio-util = { workspace = true, features = ["codec"] } time = { workspace = true } zeroize = { workspace = true } @@ -47,7 +49,9 @@ nym-validator-client = { path = "../client-libs/validator-client", default-featu nym-task = { path = "../task" } nym-credential-storage = { path = "../credential-storage" } nym-network-defaults = { path = "../network-defaults" } -nym-client-core-config-types = { path = "./config-types", features = ["disk-persistence"] } +nym-client-core-config-types = { path = "./config-types", features = [ + "disk-persistence", +] } nym-client-core-surb-storage = { path = "./surb-storage" } nym-client-core-gateways-storage = { path = "./gateways-storage" } diff --git a/common/client-core/src/client/base_client/mod.rs b/common/client-core/src/client/base_client/mod.rs index 3d159fb4857..66bbdc31915 100644 --- a/common/client-core/src/client/base_client/mod.rs +++ b/common/client-core/src/client/base_client/mod.rs @@ -35,6 +35,7 @@ use crate::init::{ }; use crate::{config, spawn_future}; use futures::channel::mpsc; +use futures::SinkExt; use log::{debug, error, info, warn}; use nym_bandwidth_controller::BandwidthController; use nym_client_core_gateways_storage::{GatewayDetails, GatewaysDetailsStore}; @@ -59,6 +60,7 @@ use std::fmt::Debug; use std::os::raw::c_int as RawFd; use std::path::Path; use std::sync::Arc; +use tokio_util::sync::{PollSendError, PollSender}; use url::Url; #[cfg(all( @@ -78,10 +80,7 @@ pub struct ClientInput { } impl ClientInput { - pub async fn send( - &self, - message: InputMessage, - ) -> Result<(), tokio::sync::mpsc::error::SendError> { + pub async fn send(&mut self, message: InputMessage) -> Result<(), PollSendError> { self.input_sender.send(message).await } } @@ -804,7 +803,7 @@ where client_input: ClientInputStatus::AwaitingProducer { client_input: ClientInput { connection_command_sender: client_connection_tx, - input_sender, + input_sender: PollSender::new(input_sender), }, }, client_output: ClientOutputStatus::AwaitingConsumer { diff --git a/common/client-core/src/client/inbound_messages.rs b/common/client-core/src/client/inbound_messages.rs index baf163913f5..52928d5de68 100644 --- a/common/client-core/src/client/inbound_messages.rs +++ b/common/client-core/src/client/inbound_messages.rs @@ -1,16 +1,26 @@ // Copyright 2020-2023 - Nym Technologies SA // SPDX-License-Identifier: Apache-2.0 - use nym_sphinx::addressing::clients::Recipient; use nym_sphinx::anonymous_replies::requests::AnonymousSenderTag; use nym_sphinx::forwarding::packet::MixPacket; use nym_sphinx::params::PacketType; use nym_task::connections::TransmissionLane; +use serde::{Deserialize, Serialize}; +use std::convert::TryInto; +use tokio_util::{ + bytes::Buf, + bytes::BytesMut, + codec::{Decoder, Encoder}, +}; + +use crate::error::ClientCoreError; -pub type InputMessageSender = tokio::sync::mpsc::Sender; +pub type InputMessageSender = tokio_util::sync::PollSender; pub type InputMessageReceiver = tokio::sync::mpsc::Receiver; -#[derive(Debug)] +const LENGHT_ENCODING_PREFIX_SIZE: usize = 4; + +#[derive(Serialize, Deserialize, Debug)] pub enum InputMessage { /// Fire an already prepared mix packets into the network. /// No guarantees are made about it. For example no retransmssion @@ -64,6 +74,10 @@ pub enum InputMessage { } impl InputMessage { + pub fn simple(data: &[u8], recipient: Recipient) -> Self { + InputMessage::new_regular(recipient, data.to_vec(), TransmissionLane::General, None) + } + pub fn new_premade( msgs: Vec, lane: TransmissionLane, @@ -197,4 +211,70 @@ impl InputMessage { InputMessage::MessageWrapper { message, .. } => message.lane(), } } + + pub fn serialized_size(&self) -> u64 { + bincode::serialized_size(self).expect("failed to get serialized InputMessage size") + + LENGHT_ENCODING_PREFIX_SIZE as u64 + } +} + +// TODO: Tests +pub struct AdressedInputMessageCodec(pub Recipient); + +impl Encoder<&[u8]> for AdressedInputMessageCodec { + type Error = ClientCoreError; + + fn encode(&mut self, item: &[u8], buf: &mut BytesMut) -> Result<(), Self::Error> { + let mut codec = InputMessageCodec; + let input_message = InputMessage::simple(item, self.0.clone()); + codec.encode(input_message, buf)?; + Ok(()) + } +} + +pub struct InputMessageCodec; + +impl Encoder for InputMessageCodec { + type Error = ClientCoreError; + + fn encode(&mut self, item: InputMessage, buf: &mut BytesMut) -> Result<(), Self::Error> { + let encoded = bincode::serialize(&item).expect("failed to serialize InputMessage"); + let encoded_len = encoded.len() as u32; + let mut encoded_with_len = encoded_len.to_le_bytes().to_vec(); + encoded_with_len.extend(encoded); + buf.reserve(encoded_with_len.len()); + buf.extend_from_slice(&encoded_with_len); + Ok(()) + } +} + +impl Decoder for InputMessageCodec { + type Item = InputMessage; + type Error = ClientCoreError; + + fn decode(&mut self, buf: &mut BytesMut) -> Result, Self::Error> { + if buf.len() < LENGHT_ENCODING_PREFIX_SIZE { + return Ok(None); + } + + let len = u32::from_le_bytes( + buf[0..LENGHT_ENCODING_PREFIX_SIZE] + .try_into() + .expect("Could not coarce to array"), + ) as usize; + if buf.len() < len + LENGHT_ENCODING_PREFIX_SIZE { + return Ok(None); + } + + let decoded = match bincode::deserialize( + &buf[LENGHT_ENCODING_PREFIX_SIZE..len + LENGHT_ENCODING_PREFIX_SIZE], + ) { + Ok(decoded) => decoded, + Err(_) => return Ok(None), + }; + + buf.advance(len + LENGHT_ENCODING_PREFIX_SIZE); + + Ok(Some(decoded)) + } } diff --git a/common/nymsphinx/Cargo.toml b/common/nymsphinx/Cargo.toml index 40769d2ec6b..0b8864d4859 100644 --- a/common/nymsphinx/Cargo.toml +++ b/common/nymsphinx/Cargo.toml @@ -12,6 +12,9 @@ log = { workspace = true } rand = { workspace = true } rand_distr = { workspace = true } thiserror = { workspace = true } +serde = { workspace = true } +bincode = { workspace = true } +tokio-util = { workspace = true, features = ["codec"] } nym-sphinx-acknowledgements = { path = "acknowledgements" } nym-sphinx-addressing = { path = "addressing" } @@ -30,7 +33,9 @@ nym-topology = { path = "../topology" } [dev-dependencies] nym-mixnet-contract-common = { path = "../cosmwasm-smart-contracts/mixnet-contract" } -nym-crypto = { path = "../crypto", version = "0.4.0", features = ["asymmetric"] } +nym-crypto = { path = "../crypto", version = "0.4.0", features = [ + "asymmetric", +] } # do not include this when compiling into wasm as it somehow when combined together with reqwest, it will require # net2 via tokio-util -> tokio -> mio -> net2 @@ -43,5 +48,13 @@ features = ["sync"] [features] default = ["sphinx"] -sphinx = ["nym-crypto/sphinx", "nym-sphinx-params/sphinx", "nym-sphinx-types/sphinx"] -outfox = ["nym-crypto/outfox", "nym-sphinx-params/outfox", "nym-sphinx-types/outfox"] +sphinx = [ + "nym-crypto/sphinx", + "nym-sphinx-params/sphinx", + "nym-sphinx-types/sphinx", +] +outfox = [ + "nym-crypto/outfox", + "nym-sphinx-params/outfox", + "nym-sphinx-types/outfox", +] diff --git a/common/nymsphinx/anonymous-replies/src/requests.rs b/common/nymsphinx/anonymous-replies/src/requests.rs index 9dd4c84dc04..10695439268 100644 --- a/common/nymsphinx/anonymous-replies/src/requests.rs +++ b/common/nymsphinx/anonymous-replies/src/requests.rs @@ -4,6 +4,7 @@ use crate::{ReplySurb, ReplySurbError}; use nym_sphinx_addressing::clients::{Recipient, RecipientFormattingError}; use rand::{CryptoRng, RngCore}; +use serde::{Deserialize, Serialize}; use std::fmt::{Display, Formatter}; use std::mem; use thiserror::Error; @@ -24,7 +25,7 @@ pub enum InvalidAnonymousSenderTagRepresentation { InvalidLength { received: usize, expected: usize }, } -#[derive(Debug, Copy, Clone, Eq, PartialEq, Hash)] +#[derive(Debug, Copy, Clone, Eq, PartialEq, Hash, Serialize, Deserialize)] #[cfg_attr(target_arch = "wasm32", wasm_bindgen)] pub struct AnonymousSenderTag([u8; SENDER_TAG_SIZE]); diff --git a/common/nymsphinx/forwarding/Cargo.toml b/common/nymsphinx/forwarding/Cargo.toml index c3f3e0dc9f3..efa5b2e1fb2 100644 --- a/common/nymsphinx/forwarding/Cargo.toml +++ b/common/nymsphinx/forwarding/Cargo.toml @@ -13,3 +13,4 @@ nym-sphinx-params = { path = "../params" } nym-sphinx-types = { path = "../types", features = ["sphinx", "outfox"] } nym-outfox = { path = "../../../nym-outfox" } thiserror = { workspace = true } +serde = { workspace = true } \ No newline at end of file diff --git a/common/nymsphinx/forwarding/src/packet.rs b/common/nymsphinx/forwarding/src/packet.rs index 97b1419aa1c..534f6b64dd4 100644 --- a/common/nymsphinx/forwarding/src/packet.rs +++ b/common/nymsphinx/forwarding/src/packet.rs @@ -4,6 +4,10 @@ use nym_sphinx_addressing::nodes::{NymNodeRoutingAddress, NymNodeRoutingAddressError}; use nym_sphinx_params::{PacketSize, PacketType}; use nym_sphinx_types::{NymPacket, NymPacketError}; +use serde::{ + de::{self, Visitor}, + Deserialize, Deserializer, Serialize, Serializer, +}; use std::fmt::{self, Debug, Formatter}; use thiserror::Error; @@ -110,6 +114,39 @@ impl MixPacket { .chain(self.packet.to_bytes()?) .collect()) } + + pub fn to_bytes(&self) -> Result, MixPacketFormattingError> { + Ok(std::iter::once(self.packet_type as u8) + .chain(self.next_hop.as_bytes()) + .chain(self.packet.to_bytes()?) + .collect()) + } +} + +impl Serialize for MixPacket { + fn serialize(&self, serializer: S) -> Result { + serializer.serialize_bytes(&self.to_bytes().map_err(serde::ser::Error::custom)?) + } +} + +struct MixPacketVisitor; + +impl<'de> Visitor<'de> for MixPacketVisitor { + type Value = MixPacket; + + fn expecting(&self, formatter: &mut fmt::Formatter) -> fmt::Result { + formatter.write_str("a byte array representing a mix packet") + } + + fn visit_bytes(self, v: &[u8]) -> Result { + MixPacket::try_from_bytes(v).map_err(serde::de::Error::custom) + } +} + +impl<'de> Deserialize<'de> for MixPacket { + fn deserialize>(deserializer: D) -> Result { + deserializer.deserialize_bytes(MixPacketVisitor) + } } // TODO: test for serialization and errors! diff --git a/common/nymsphinx/src/receiver.rs b/common/nymsphinx/src/receiver.rs index ac1857fd82d..91dc87845aa 100644 --- a/common/nymsphinx/src/receiver.rs +++ b/common/nymsphinx/src/receiver.rs @@ -1,7 +1,10 @@ // Copyright 2021 - Nym Technologies SA // SPDX-License-Identifier: Apache-2.0 +use std::io; + use crate::message::{NymMessage, NymMessageError, PaddedMessage, PlainMessage}; +use log::debug; use nym_crypto::aes::cipher::{KeyIvInit, StreamCipher}; use nym_crypto::asymmetric::encryption; use nym_crypto::shared_key::recompute_shared_key; @@ -16,10 +19,13 @@ use nym_sphinx_params::{ PacketEncryptionAlgorithm, PacketHkdfAlgorithm, ReplySurbEncryptionAlgorithm, DEFAULT_NUM_MIX_HOPS, }; +use serde::{Deserialize, Serialize}; use thiserror::Error; +use tokio_util::bytes::{Buf, BytesMut}; +use tokio_util::codec::{Decoder, Encoder}; // TODO: should this live in this file? -#[derive(Debug)] +#[derive(Debug, Serialize, Deserialize)] pub struct ReconstructedMessage { /// The actual plaintext message that was received. pub message: Vec, @@ -57,6 +63,62 @@ impl From for ReconstructedMessage { } } +pub struct ReconstructedMessageCodec; +const LENGHT_ENCODING_PREFIX_SIZE: usize = 4; + +impl Encoder for ReconstructedMessageCodec { + type Error = MessageRecoveryError; + + fn encode( + &mut self, + item: ReconstructedMessage, + buf: &mut BytesMut, + ) -> Result<(), Self::Error> { + let encoded = bincode::serialize(&item).expect("failed to serialize ReconstructedMessage"); + let encoded_len = encoded.len() as u32; + let mut encoded_with_len = encoded_len.to_le_bytes().to_vec(); + encoded_with_len.extend(encoded); + buf.reserve(encoded_with_len.len()); + buf.extend_from_slice(&encoded_with_len); + Ok(()) + } +} + +impl Decoder for ReconstructedMessageCodec { + type Item = ReconstructedMessage; + type Error = MessageRecoveryError; + + fn decode(&mut self, buf: &mut BytesMut) -> Result, Self::Error> { + if buf.len() < LENGHT_ENCODING_PREFIX_SIZE { + return Ok(None); + } + + let len = u32::from_le_bytes( + buf[0..LENGHT_ENCODING_PREFIX_SIZE] + .try_into() + .expect("We know that we have at least LENGHT_ENCODING_PREFIX_SIZE bytes in there"), + ) as usize; + + if buf.len() < len + LENGHT_ENCODING_PREFIX_SIZE { + return Ok(None); + } + + let decoded = match bincode::deserialize( + &buf[LENGHT_ENCODING_PREFIX_SIZE..len + LENGHT_ENCODING_PREFIX_SIZE], + ) { + Ok(decoded) => decoded, + Err(e) => { + debug!("Failed to decode the message - {:?}", e); + return Ok(None); + } + }; + + buf.advance(len + LENGHT_ENCODING_PREFIX_SIZE); + + Ok(Some(decoded)) + } +} + #[derive(Debug, Error)] pub enum MessageRecoveryError { #[error("The received message did not contain enough bytes to recover the ephemeral public key. Got {provided}. required: {required}")] @@ -74,6 +136,9 @@ pub enum MessageRecoveryError { #[error("Failed to recover message fragment - {0}")] FragmentRecoveryError(#[from] ChunkingError), + + #[error("Failed to recover message fragment - {0}")] + MessageRecoveryError(#[from] io::Error), } pub trait MessageReceiver { diff --git a/common/socks5-client-core/Cargo.toml b/common/socks5-client-core/Cargo.toml index 344b60cd64b..17194a729e9 100644 --- a/common/socks5-client-core/Cargo.toml +++ b/common/socks5-client-core/Cargo.toml @@ -19,6 +19,7 @@ serde = { workspace = true, features = ["derive"] } # for config serialization/d tap = { workspace = true } thiserror = { workspace = true } tokio = { workspace = true, features = ["rt-multi-thread", "net", "signal"] } +tokio-util = { workspace = true } url = { workspace = true } nym-bandwidth-controller = { path = "../../common/bandwidth-controller" } diff --git a/common/socks5-client-core/src/socks/client.rs b/common/socks5-client-core/src/socks/client.rs index cca9cd6b53d..d7908a15087 100644 --- a/common/socks5-client-core/src/socks/client.rs +++ b/common/socks5-client-core/src/socks/client.rs @@ -7,6 +7,7 @@ use super::{SocksVersion, RESERVED, SOCKS4_VERSION, SOCKS5_VERSION}; use crate::config; use futures::channel::mpsc; use futures::task::{Context, Poll}; +use futures::SinkExt; use log::*; use nym_client_core::client::inbound_messages::{InputMessage, InputMessageSender}; use nym_service_providers_common::interface::{ProviderInterfaceVersion, RequestVersion}; diff --git a/common/socks5/proxy-helpers/src/ordered_sender.rs b/common/socks5/proxy-helpers/src/ordered_sender.rs index d71f8435587..de30293eb04 100644 --- a/common/socks5/proxy-helpers/src/ordered_sender.rs +++ b/common/socks5/proxy-helpers/src/ordered_sender.rs @@ -3,11 +3,12 @@ use crate::proxy_runner::MixProxySender; use bytes::Bytes; +use futures::SinkExt; use log::{debug, error}; use nym_socks5_requests::{ConnectionId, SocketData}; use std::io; -pub(crate) struct OrderedMessageSender { +pub(crate) struct OrderedMessageSender { connection_id: ConnectionId, // addresses are provided for better logging local_destination_address: String, @@ -18,7 +19,7 @@ pub(crate) struct OrderedMessageSender { mix_message_adapter: F, } -impl OrderedMessageSender +impl OrderedMessageSender where F: Fn(SocketData) -> S, { @@ -55,7 +56,7 @@ where (self.mix_message_adapter)(data) } - async fn send_message(&self, message: S) { + async fn send_message(&mut self, message: S) { if self.mixnet_sender.send(message).await.is_err() { panic!("BatchRealMessageReceiver has stopped receiving!") } diff --git a/common/socks5/proxy-helpers/src/proxy_runner/inbound.rs b/common/socks5/proxy-helpers/src/proxy_runner/inbound.rs index 82e6396a5fa..60bd0bb7868 100644 --- a/common/socks5/proxy-helpers/src/proxy_runner/inbound.rs +++ b/common/socks5/proxy-helpers/src/proxy_runner/inbound.rs @@ -74,7 +74,7 @@ async fn wait_for_lane( } } -pub(super) async fn run_inbound( +pub(super) async fn run_inbound( mut reader: OwnedReadHalf, mut message_sender: OrderedMessageSender, connection_id: ConnectionId, diff --git a/common/socks5/proxy-helpers/src/proxy_runner/mod.rs b/common/socks5/proxy-helpers/src/proxy_runner/mod.rs index 38499ea59c2..dcf6298520e 100644 --- a/common/socks5/proxy-helpers/src/proxy_runner/mod.rs +++ b/common/socks5/proxy-helpers/src/proxy_runner/mod.rs @@ -9,6 +9,7 @@ use nym_task::TaskClient; use std::fmt::Debug; use std::{sync::Arc, time::Duration}; use tokio::{net::TcpStream, sync::Notify}; +use tokio_util::sync::PollSender; mod inbound; mod outbound; @@ -35,7 +36,7 @@ impl From<(Vec, bool)> for ProxyMessage { } } -pub type MixProxySender = tokio::sync::mpsc::Sender; +pub type MixProxySender = PollSender; pub type MixProxyReader = tokio::sync::mpsc::Receiver; // TODO: when we finally get to implementing graceful shutdown, diff --git a/common/statistics/src/collector.rs b/common/statistics/src/collector.rs index ee418a20285..9a995befb59 100644 --- a/common/statistics/src/collector.rs +++ b/common/statistics/src/collector.rs @@ -19,7 +19,7 @@ pub trait StatisticsCollector { interval: Duration, timestamp: DateTime, ) -> StatsMessage; - async fn send_stats_message(&self, stats_message: StatsMessage) -> Result<(), StatsError>; + async fn send_stats_message(&mut self, stats_message: StatsMessage) -> Result<(), StatsError>; async fn reset_stats(&mut self); } diff --git a/common/task/Cargo.toml b/common/task/Cargo.toml index 6f0f6b7579a..65095d6797a 100644 --- a/common/task/Cargo.toml +++ b/common/task/Cargo.toml @@ -12,6 +12,7 @@ futures = { workspace = true } log = { workspace = true } thiserror = { workspace = true } tokio = { workspace = true, features = ["macros", "sync"] } +serde = { workspace = true } [target."cfg(not(target_arch = \"wasm32\"))".dependencies.tokio] workspace = true diff --git a/common/task/src/connections.rs b/common/task/src/connections.rs index af13111526c..e55175f1eb7 100644 --- a/common/task/src/connections.rs +++ b/common/task/src/connections.rs @@ -2,11 +2,12 @@ // SPDX-License-Identifier: Apache-2.0 use futures::channel::mpsc; +use serde::{Deserialize, Serialize}; use std::collections::HashMap; pub type ConnectionId = u64; -#[derive(Copy, Clone, Debug, Hash, PartialEq, Eq)] +#[derive(Copy, Clone, Debug, Hash, PartialEq, Eq, Serialize, Deserialize)] pub enum TransmissionLane { General, // we need to treat surb-related requests and responses at higher priority diff --git a/gateway/src/node/statistics/collector.rs b/gateway/src/node/statistics/collector.rs index d82847509f6..889343e8065 100644 --- a/gateway/src/node/statistics/collector.rs +++ b/gateway/src/node/statistics/collector.rs @@ -52,7 +52,7 @@ impl StatisticsCollector for GatewayStatisticsCollector { } } - async fn send_stats_message(&self, stats_message: StatsMessage) -> Result<(), StatsError> { + async fn send_stats_message(&mut self, stats_message: StatsMessage) -> Result<(), StatsError> { build_and_send_statistics_request(stats_message, self.statistics_service_url.to_string()) .await } diff --git a/sdk/rust/nym-sdk/Cargo.toml b/sdk/rust/nym-sdk/Cargo.toml index b657db85704..4253c92e2f4 100644 --- a/sdk/rust/nym-sdk/Cargo.toml +++ b/sdk/rust/nym-sdk/Cargo.toml @@ -9,7 +9,10 @@ license.workspace = true [dependencies] async-trait = { workspace = true } bip39 = { workspace = true } -nym-client-core = { path = "../../../common/client-core", features = ["fs-surb-storage", "fs-gateways-storage"] } +nym-client-core = { path = "../../../common/client-core", features = [ + "fs-surb-storage", + "fs-gateways-storage", +] } nym-crypto = { path = "../../../common/crypto" } nym-gateway-requests = { path = "../../../gateway/gateway-requests" } nym-bandwidth-controller = { path = "../../../common/bandwidth-controller" } @@ -21,7 +24,9 @@ nym-sphinx = { path = "../../../common/nymsphinx" } nym-task = { path = "../../../common/task" } nym-topology = { path = "../../../common/topology" } nym-socks5-client-core = { path = "../../../common/socks5-client-core" } -nym-validator-client = { path = "../../../common/client-libs/validator-client", features = ["http-client"] } +nym-validator-client = { path = "../../../common/client-libs/validator-client", features = [ + "http-client", +] } nym-socks5-requests = { path = "../../../common/socks5/requests" } nym-ordered-buffer = { path = "../../../common/socks5/ordered-buffer" } nym-service-providers-common = { path = "../../../service-providers/common" } @@ -37,6 +42,8 @@ tap = { workspace = true } thiserror = { workspace = true } url = { workspace = true } toml = "0.5.10" +tokio = { workspace = true } +tokio-util = { workspace = true, features = ["codec"] } [dev-dependencies] anyhow = { workspace = true } diff --git a/sdk/rust/nym-sdk/examples/parallel_sending_and_receiving.rs b/sdk/rust/nym-sdk/examples/parallel_sending_and_receiving.rs index ca9b8437171..643aec1fb2c 100644 --- a/sdk/rust/nym-sdk/examples/parallel_sending_and_receiving.rs +++ b/sdk/rust/nym-sdk/examples/parallel_sending_and_receiving.rs @@ -16,7 +16,7 @@ async fn main() { let our_address = *client.nym_address(); println!("Our client nym address is: {our_address}"); - let sender = client.split_sender(); + let mut sender = client.split_sender(); // receiving task let receiving_task_handle = tokio::spawn(async move { diff --git a/sdk/rust/nym-sdk/src/mixnet/native_client.rs b/sdk/rust/nym-sdk/src/mixnet/native_client.rs index ad514d6513e..30f96c8d680 100644 --- a/sdk/rust/nym-sdk/src/mixnet/native_client.rs +++ b/sdk/rust/nym-sdk/src/mixnet/native_client.rs @@ -2,9 +2,11 @@ use crate::mixnet::client::MixnetClientBuilder; use crate::mixnet::traits::MixnetMessageSender; use crate::{Error, Result}; use async_trait::async_trait; -use futures::{ready, Stream, StreamExt}; +use bytes::{Buf as _, BytesMut}; +use futures::{ready, FutureExt, Sink, SinkExt, Stream, StreamExt}; use log::error; use nym_client_core::client::base_client::GatewayConnection; +use nym_client_core::client::inbound_messages::InputMessageCodec; use nym_client_core::client::{ base_client::{ClientInput, ClientOutput, ClientState}, inbound_messages::InputMessage, @@ -12,15 +14,18 @@ use nym_client_core::client::{ }; use nym_crypto::asymmetric::identity; use nym_sphinx::addressing::clients::Recipient; +use nym_sphinx::receiver::ReconstructedMessageCodec; use nym_sphinx::{params::PacketType, receiver::ReconstructedMessage}; use nym_task::{ connections::{ConnectionCommandSender, LaneQueueLengths}, TaskHandle, }; use nym_topology::NymTopology; -use std::pin::Pin; +use std::pin::{pin, Pin}; use std::sync::Arc; use std::task::{Context, Poll}; +use tokio::io::{AsyncRead, AsyncWrite, ReadBuf}; +use tokio_util::codec::{Encoder, FramedRead}; /// Client connected to the Nym mixnet. pub struct MixnetClient { @@ -51,6 +56,24 @@ pub struct MixnetClient { // internal state used for the `Stream` implementation _buffered: Vec, + + // internal state used for the `AsyncRead` implementation + _read: ReadBuffer, +} + +#[derive(Debug, Default)] +struct ReadBuffer { + buffer: BytesMut, +} + +impl ReadBuffer { + fn clear(&mut self) { + self.buffer.clear(); + } + + fn pending(&self) -> bool { + !self.buffer.is_empty() + } } impl MixnetClient { @@ -75,6 +98,7 @@ impl MixnetClient { task_handle, packet_type, _buffered: Vec::new(), + _read: ReadBuffer::default(), } } @@ -190,6 +214,25 @@ impl MixnetClient { // note: it's important to take ownership of the struct as if the shutdown is `TaskHandle::External`, // it must be dropped to finalize the shutdown } + + fn read_buffer_to_slice( + &mut self, + buf: &mut ReadBuf, + cx: &mut Context<'_>, + ) -> Poll> { + if self._read.buffer.len() < buf.capacity() { + // let written = self._read.buffer.len(); + buf.put_slice(&self._read.buffer); + self._read.clear(); + Poll::Ready(Ok(())) + } else { + let written = buf.capacity(); + buf.put_slice(&self._read.buffer.split_off(written)); + self._read.buffer.advance(written); + cx.waker().wake_by_ref(); + Poll::Ready(Ok(())) + } + } } #[derive(Clone)] @@ -198,6 +241,200 @@ pub struct MixnetClientSender { packet_type: Option, } +impl AsyncRead for MixnetClient { + fn poll_read( + mut self: Pin<&mut Self>, + cx: &mut Context<'_>, + buf: &mut ReadBuf, + ) -> Poll> { + let mut codec = ReconstructedMessageCodec {}; + + if self._read.pending() { + return self.read_buffer_to_slice(buf, cx); + } + + let msg = match self.as_mut().poll_next(cx) { + Poll::Ready(Some(msg)) => msg, + Poll::Ready(None) => return Poll::Ready(Ok(())), + Poll::Pending => return Poll::Pending, + }; + + match codec.encode(msg, &mut self._read.buffer) { + Ok(_) => {} + Err(e) => { + error!("failed to encode reconstructed message: {:?}", e); + return Poll::Ready(Err(tokio::io::Error::new( + tokio::io::ErrorKind::Other, + "failed to encode reconstructed message", + ))); + } + }; + + self.read_buffer_to_slice(buf, cx) + } +} + +impl AsyncWrite for MixnetClient { + fn poll_write( + mut self: Pin<&mut Self>, + cx: &mut Context<'_>, + buf: &[u8], + ) -> Poll> { + let codec = InputMessageCodec {}; + let mut reader = FramedRead::new(buf, codec); + let mut fut = reader.next(); + let msg = match fut.poll_unpin(cx) { + Poll::Ready(Some(Ok(msg))) => msg, + Poll::Ready(Some(Err(_))) => { + return Poll::Ready(Err(std::io::Error::new( + std::io::ErrorKind::Other, + "failed to read message from input", + ))) + } + Poll::Pending => return Poll::Pending, + Poll::Ready(None) => return Poll::Ready(Ok(0)), + }; + + let msg_size = msg.serialized_size(); + + let mut fut = pin!(self.client_input.send(msg)); + match fut.poll_unpin(cx) { + Poll::Ready(Ok(())) => Poll::Ready(Ok(msg_size as usize)), + Poll::Ready(Err(_)) => Poll::Ready(Err(std::io::Error::new( + std::io::ErrorKind::Other, + "failed to send message to mixnet", + ))), + Poll::Pending => Poll::Pending, + } + } + + fn poll_flush( + self: Pin<&mut Self>, + cx: &mut Context<'_>, + ) -> Poll> { + Sink::poll_flush(self, cx) + .map_err(|_| std::io::Error::new(std::io::ErrorKind::Other, "failed to flush the sink")) + } + + fn poll_shutdown( + self: Pin<&mut Self>, + cx: &mut Context<'_>, + ) -> Poll> { + AsyncWrite::poll_flush(self, cx) + } +} + +impl Sink for MixnetClient { + type Error = Error; + + fn poll_ready(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + match self.sender().poll_ready_unpin(cx) { + Poll::Ready(Ok(())) => Poll::Ready(Ok(())), + Poll::Ready(Err(_)) => Poll::Ready(Err(Error::MessageSendingFailure)), + Poll::Pending => Poll::Pending, + } + } + + fn start_send(mut self: Pin<&mut Self>, item: InputMessage) -> Result<()> { + self.sender() + .start_send_unpin(item) + .map_err(|_| Error::MessageSendingFailure) + } + + fn poll_flush(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + self.sender() + .poll_flush_unpin(cx) + .map_err(|_| Error::MessageSendingFailure) + } + + fn poll_close(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + self.sender() + .poll_close_unpin(cx) + .map_err(|_| Error::MessageSendingFailure) + } +} + +// TODO: there should be a better way of implementing Sink and AsyncWrite over T: MixnetMessageSender +impl AsyncWrite for MixnetClientSender { + fn poll_write( + mut self: Pin<&mut Self>, + cx: &mut Context<'_>, + buf: &[u8], + ) -> Poll> { + let codec = InputMessageCodec {}; + let mut reader = FramedRead::new(buf, codec); + let mut fut = reader.next(); + let msg = match fut.poll_unpin(cx) { + Poll::Ready(Some(Ok(msg))) => msg, + Poll::Ready(Some(Err(_))) => { + return Poll::Ready(Err(std::io::Error::new( + std::io::ErrorKind::Other, + "failed to read message from input", + ))) + } + Poll::Pending => return Poll::Pending, + Poll::Ready(None) => return Poll::Ready(Ok(0)), + }; + + let msg_size = msg.serialized_size(); + + let mut fut = pin!(self.client_input.send(msg)); + match fut.poll_unpin(cx) { + Poll::Ready(Ok(())) => Poll::Ready(Ok(msg_size as usize)), + Poll::Ready(Err(_)) => Poll::Ready(Err(std::io::Error::new( + std::io::ErrorKind::Other, + "failed to send message to mixnet", + ))), + Poll::Pending => Poll::Pending, + } + } + + fn poll_flush( + self: Pin<&mut Self>, + cx: &mut Context<'_>, + ) -> Poll> { + Sink::poll_flush(self, cx) + .map_err(|_| std::io::Error::new(std::io::ErrorKind::Other, "failed to flush the sink")) + } + + fn poll_shutdown( + self: Pin<&mut Self>, + cx: &mut Context<'_>, + ) -> Poll> { + AsyncWrite::poll_flush(self, cx) + } +} + +impl Sink for MixnetClientSender { + type Error = Error; + + fn poll_ready(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + match self.sender().poll_ready_unpin(cx) { + Poll::Ready(Ok(())) => Poll::Ready(Ok(())), + Poll::Ready(Err(_)) => Poll::Ready(Err(Error::MessageSendingFailure)), + Poll::Pending => Poll::Pending, + } + } + + fn start_send(mut self: Pin<&mut Self>, item: InputMessage) -> Result<()> { + self.sender() + .start_send_unpin(item) + .map_err(|_| Error::MessageSendingFailure) + } + + fn poll_flush(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + self.sender() + .poll_flush_unpin(cx) + .map_err(|_| Error::MessageSendingFailure) + } + + fn poll_close(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + self.sender() + .poll_close_unpin(cx) + .map_err(|_| Error::MessageSendingFailure) + } +} + impl Stream for MixnetClient { type Item = ReconstructedMessage; @@ -234,12 +471,16 @@ impl MixnetMessageSender for MixnetClient { self.packet_type } - async fn send(&self, message: InputMessage) -> Result<()> { + async fn send(&mut self, message: InputMessage) -> Result<()> { self.client_input .send(message) .await .map_err(|_| Error::MessageSendingFailure) } + + fn sender(&mut self) -> &mut tokio_util::sync::PollSender { + &mut self.client_input.input_sender + } } #[async_trait] @@ -248,10 +489,14 @@ impl MixnetMessageSender for MixnetClientSender { self.packet_type } - async fn send(&self, message: InputMessage) -> Result<()> { + async fn send(&mut self, message: InputMessage) -> Result<()> { self.client_input .send(message) .await .map_err(|_| Error::MessageSendingFailure) } + + fn sender(&mut self) -> &mut tokio_util::sync::PollSender { + &mut self.client_input.input_sender + } } diff --git a/sdk/rust/nym-sdk/src/mixnet/traits.rs b/sdk/rust/nym-sdk/src/mixnet/traits.rs index 8eb78109f7a..d8649dfa7b2 100644 --- a/sdk/rust/nym-sdk/src/mixnet/traits.rs +++ b/sdk/rust/nym-sdk/src/mixnet/traits.rs @@ -16,9 +16,11 @@ pub trait MixnetMessageSender { None } + fn sender(&mut self) -> &mut tokio_util::sync::PollSender; + /// Sends a [`InputMessage`] to the mixnet. This is the most low-level sending function, for /// full customization. - async fn send(&self, message: InputMessage) -> Result<()>; + async fn send(&mut self, message: InputMessage) -> Result<()>; /// Sends data to the supplied Nym address with the default surb behaviour. /// @@ -35,7 +37,7 @@ pub trait MixnetMessageSender { /// client.send_plain_message(recipient, "hi").await.unwrap(); /// } /// ``` - async fn send_plain_message(&self, address: Recipient, message: M) -> Result<()> + async fn send_plain_message(&mut self, address: Recipient, message: M) -> Result<()> where M: AsRef<[u8]> + Send, { @@ -61,7 +63,7 @@ pub trait MixnetMessageSender { /// } /// ``` async fn send_message( - &self, + &mut self, address: Recipient, message: M, surbs: IncludedSurbs, @@ -103,7 +105,7 @@ pub trait MixnetMessageSender { /// client.send_reply(tag, b"hi").await.unwrap(); /// } /// ``` - async fn send_reply(&self, recipient_tag: AnonymousSenderTag, message: M) -> Result<()> + async fn send_reply(&mut self, recipient_tag: AnonymousSenderTag, message: M) -> Result<()> where M: AsRef<[u8]> + Send, { diff --git a/service-providers/ip-packet-router/src/mixnet_listener.rs b/service-providers/ip-packet-router/src/mixnet_listener.rs index 4e41f3d996e..f835d1d526a 100644 --- a/service-providers/ip-packet-router/src/mixnet_listener.rs +++ b/service-providers/ip-packet-router/src/mixnet_listener.rs @@ -605,7 +605,7 @@ impl MixnetListener { // When an incoming mixnet message triggers a response that we send back, such as during // connect handshake. - async fn handle_response(&self, response: IpPacketResponse) -> Result<()> { + async fn handle_response(&mut self, response: IpPacketResponse) -> Result<()> { let Some(recipient) = response.recipient() else { log::error!("No recipient in response packet, this should NOT happen!"); return Err(IpPacketRouterError::NoRecipientInResponse); @@ -635,7 +635,7 @@ impl MixnetListener { // A single incoming request can trigger multiple responses, such as when data requests contain // multiple IP packets. - async fn handle_responses(&self, responses: Vec) { + async fn handle_responses(&mut self, responses: Vec) { for response in responses { match response { Ok(Some(response)) => { diff --git a/service-providers/network-requester/Cargo.toml b/service-providers/network-requester/Cargo.toml index e0231081926..2ad5ff163a6 100644 --- a/service-providers/network-requester/Cargo.toml +++ b/service-providers/network-requester/Cargo.toml @@ -20,7 +20,7 @@ anyhow = { workspace = true } addr = { workspace = true } async-trait = { workspace = true } bs58 = { workspace = true } -clap = { workspace = true, features = ["cargo", "derive"]} +clap = { workspace = true, features = ["cargo", "derive"] } dirs = "4.0" futures = { workspace = true } humantime-serde = { workspace = true } @@ -33,19 +33,26 @@ regex = { workspace = true } reqwest = { workspace = true, features = ["json"] } serde = { workspace = true, features = ["derive"] } serde_json = { workspace = true } -sqlx = { workspace = true, features = ["runtime-tokio-rustls", "chrono"]} +sqlx = { workspace = true, features = ["runtime-tokio-rustls", "chrono"] } tap = { workspace = true } thiserror = { workspace = true } -tokio = { workspace = true, features = [ "net", "rt-multi-thread", "macros" ] } +tokio = { workspace = true, features = ["net", "rt-multi-thread", "macros"] } tokio-tungstenite = { workspace = true } +tokio-util = { workspace = true } url = { workspace = true } time = { workspace = true } zeroize = { workspace = true } # internal nym-async-file-watcher = { path = "../../common/async-file-watcher" } -nym-bin-common = { path = "../../common/bin-common", features = ["output_format"] } -nym-client-core = { path = "../../common/client-core", features = ["cli", "fs-gateways-storage", "fs-surb-storage"] } +nym-bin-common = { path = "../../common/bin-common", features = [ + "output_format", +] } +nym-client-core = { path = "../../common/client-core", features = [ + "cli", + "fs-gateways-storage", + "fs-surb-storage", +] } nym-client-websocket-requests = { path = "../../clients/native/websocket-requests" } nym-config = { path = "../../common/config" } nym-credentials = { path = "../../common/credentials" } diff --git a/service-providers/network-requester/src/core.rs b/service-providers/network-requester/src/core.rs index 01c395365cc..cf4b75794a5 100644 --- a/service-providers/network-requester/src/core.rs +++ b/service-providers/network-requester/src/core.rs @@ -10,13 +10,14 @@ use crate::{reply, socks5}; use async_trait::async_trait; use futures::channel::{mpsc, oneshot}; use futures::stream::StreamExt; +use futures::SinkExt; use log::{debug, warn}; use nym_bin_common::bin_info_owned; use nym_client_core::client::mix_traffic::transceiver::GatewayTransceiver; use nym_client_core::config::disk_persistence::CommonClientPaths; use nym_client_core::HardcodedTopologyProvider; use nym_network_defaults::NymNetworkDetails; -use nym_sdk::mixnet::{MixnetMessageSender, TopologyProvider}; +use nym_sdk::mixnet::TopologyProvider; use nym_service_providers_common::interface::{ BinaryInformation, ProviderInterfaceVersion, Request, RequestVersion, }; @@ -40,6 +41,7 @@ use nym_task::manager::TaskHandle; use nym_task::TaskClient; use std::path::Path; use std::sync::atomic::{AtomicUsize, Ordering}; +use tokio_util::sync::PollSender; // Since it's an atomic, it's safe to be kept static and shared across threads static ACTIVE_PROXIES: AtomicUsize = AtomicUsize::new(0); @@ -284,6 +286,8 @@ impl NRServiceProviderBuilder { // going to be used by `mixnet_response_listener` let (mix_input_sender, mix_input_receiver) = tokio::sync::mpsc::channel::(1); + let mix_input_sender = PollSender::new(mix_input_sender); + // Controller for managing all active connections. let (mut active_connections_controller, controller_sender) = Controller::new( mixnet_client.connection_command_sender(), @@ -400,7 +404,7 @@ impl NRServiceProvider { /// Listens for any messages from `mix_reader` that should be written back to the mix network /// via the `websocket_writer`. async fn mixnet_response_listener( - mixnet_client_sender: nym_sdk::mixnet::MixnetClientSender, + mut mixnet_client_sender: nym_sdk::mixnet::MixnetClientSender, mut mix_input_reader: MixProxyReader, stats_collector: Option, packet_type: PacketType, @@ -425,7 +429,7 @@ impl NRServiceProvider { } let response_message = msg.into_input_message(packet_type); - mixnet_client_sender.send(response_message).await.unwrap(); + nym_sdk::mixnet::MixnetMessageSender::send(&mut mixnet_client_sender, response_message).await.unwrap(); } else { log::error!("Exiting: channel closed!"); break; @@ -443,7 +447,7 @@ impl NRServiceProvider { return_address: reply::MixnetAddress, biggest_packet_size: PacketSize, controller_sender: ControllerSender, - mix_input_sender: MixProxySender, + mut mix_input_sender: MixProxySender, lane_queue_lengths: LaneQueueLengths, mut shutdown: TaskClient, ) { @@ -537,7 +541,7 @@ impl NRServiceProvider { .unwrap_or(traffic_config.primary_packet_size); let controller_sender_clone = self.controller_sender.clone(); - let mix_input_sender_clone = self.mix_input_sender.clone(); + let mut mix_input_sender_clone = self.mix_input_sender.clone(); let lane_queue_lengths_clone = self.mixnet_client.shared_lane_queue_lengths(); let mut shutdown = self.shutdown.get_handle(); diff --git a/service-providers/network-requester/src/statistics/collector.rs b/service-providers/network-requester/src/statistics/collector.rs index 95bc4cce711..2e114ccff73 100644 --- a/service-providers/network-requester/src/statistics/collector.rs +++ b/service-providers/network-requester/src/statistics/collector.rs @@ -5,6 +5,7 @@ use super::error::StatsError; use crate::core::new_legacy_request_version; use crate::reply::MixnetMessage; use async_trait::async_trait; +use futures::SinkExt; use log::*; use nym_service_providers_common::interface::RequestVersion; use nym_socks5_proxy_helpers::proxy_runner::MixProxySender; @@ -165,7 +166,7 @@ impl StatisticsCollector for ServiceStatisticsCollector { } async fn send_stats_message( - &self, + &mut self, stats_message: StatsMessage, ) -> Result<(), CommonStatsError> { let msg = build_statistics_request_bytes(stats_message)?; diff --git a/wasm/client/Cargo.toml b/wasm/client/Cargo.toml index 48589a20c2f..e8c4422211c 100644 --- a/wasm/client/Cargo.toml +++ b/wasm/client/Cargo.toml @@ -1,6 +1,9 @@ [package] name = "nym-client-wasm" -authors = ["Dave Hrycyszyn ", "Jedrzej Stuczynski "] +authors = [ + "Dave Hrycyszyn ", + "Jedrzej Stuczynski ", +] version = "1.3.0-rc.0" edition = "2021" keywords = ["nym", "sphinx", "wasm", "webassembly", "privacy", "client"] @@ -22,15 +25,16 @@ serde_json = { workspace = true } serde-wasm-bindgen = { workspace = true } wasm-bindgen = { workspace = true } wasm-bindgen-futures = { workspace = true } -thiserror = { workspace = true } +thiserror = { workspace = true } tsify = { workspace = true, features = ["js"] } +tokio = { workspace = true, default-features = false, features = ["sync"] } nym-bin-common = { path = "../../common/bin-common" } wasm-client-core = { path = "../../common/wasm/client-core" } wasm-utils = { path = "../../common/wasm/utils" } nym-node-tester-utils = { path = "../../common/node-tester-utils", optional = true } -nym-node-tester-wasm = { path = "../node-tester", optional = true} +nym-node-tester-wasm = { path = "../node-tester", optional = true } [dev-dependencies] wasm-bindgen-test = { workspace = true } diff --git a/wasm/client/src/client.rs b/wasm/client/src/client.rs index ad54cb5e6f5..eb4a2c727a1 100644 --- a/wasm/client/src/client.rs +++ b/wasm/client/src/client.rs @@ -14,6 +14,7 @@ use crate::response_pusher::ResponsePusher; use js_sys::Promise; use serde::{Deserialize, Serialize}; use std::sync::Arc; +use tokio::sync::RwLock; use tsify::Tsify; use wasm_bindgen::prelude::*; use wasm_bindgen_futures::future_to_promise; @@ -50,7 +51,7 @@ pub(crate) const NODE_TESTER_CLIENT_ID: &str = "_nym-node-tester-client"; #[wasm_bindgen] pub struct NymClient { self_address: String, - client_input: Arc, + client_input: Arc>, client_state: Arc, // keep track of the "old" topology for the purposes of node tester @@ -196,7 +197,7 @@ impl NymClientBuilder { Ok(NymClient { self_address, - client_input: Arc::new(client_input), + client_input: Arc::new(RwLock::new(client_input)), client_state: Arc::new(started_client.client_state), _full_topology: None, // this cannot failed as we haven't passed an external task manager diff --git a/wasm/client/src/helpers.rs b/wasm/client/src/helpers.rs index e8570d02b90..92951e49d44 100644 --- a/wasm/client/src/helpers.rs +++ b/wasm/client/src/helpers.rs @@ -1,8 +1,10 @@ // Copyright 2022-2023 - Nym Technologies SA // SPDX-License-Identifier: Apache-2.0 +use futures::SinkExt; use js_sys::Promise; use std::sync::Arc; +use tokio::sync::RwLock; use wasm_bindgen::JsValue; use wasm_bindgen_futures::future_to_promise; use wasm_client_core::client::base_client::{ClientInput, ClientState}; @@ -48,10 +50,11 @@ pub(crate) trait InputSender { fn send_messages(&self, messages: Vec) -> Promise; } -impl InputSender for Arc { +impl InputSender for Arc> { fn send_message(&self, message: InputMessage) -> Promise { let this = Arc::clone(self); future_to_promise(async move { + let mut this = this.write().await; match this.input_sender.send(message).await { Ok(_) => Ok(JsValue::null()), Err(_) => Err(simple_js_error( @@ -64,6 +67,7 @@ impl InputSender for Arc { fn send_messages(&self, messages: Vec) -> Promise { let this = Arc::clone(self); future_to_promise(async move { + let mut this = this.write().await; for message in messages { if this.input_sender.send(message).await.is_err() { return Err(simple_js_error( diff --git a/wasm/mix-fetch/Cargo.toml b/wasm/mix-fetch/Cargo.toml index fd187f04e1e..f64d2001e15 100644 --- a/wasm/mix-fetch/Cargo.toml +++ b/wasm/mix-fetch/Cargo.toml @@ -24,7 +24,7 @@ tokio = { workspace = true, features = ["sync"] } url = { workspace = true } wasm-bindgen = { workspace = true } wasm-bindgen-futures = { workspace = true } -thiserror = { workspace = true } +thiserror = { workspace = true } tsify = { workspace = true, features = ["js"] } nym-bin-common = { path = "../../common/bin-common" } diff --git a/wasm/mix-fetch/src/client.rs b/wasm/mix-fetch/src/client.rs index 9e4bff5e321..55bc92580d0 100644 --- a/wasm/mix-fetch/src/client.rs +++ b/wasm/mix-fetch/src/client.rs @@ -8,10 +8,13 @@ use crate::go_bridge::goWasmSetMixFetchRequestTimeout; use crate::request_writer::RequestWriter; use crate::socks_helpers::{socks5_connect_request, socks5_data_request}; use crate::{config, RequestId}; +use futures::SinkExt; use js_sys::Promise; use nym_socks5_requests::RemoteAddress; use std::sync::atomic::{AtomicBool, Ordering}; +use std::sync::Arc; use tokio::sync::Mutex; +use tokio::sync::RwLock; use wasm_bindgen::prelude::*; use wasm_bindgen_futures::future_to_promise; use wasm_client_core::client::base_client::{BaseClientBuilder, ClientInput, ClientOutput}; @@ -34,7 +37,7 @@ pub struct MixFetchClient { self_address: Recipient, - client_input: ClientInput, + client_input: Arc>, requests: ActiveRequests, @@ -131,7 +134,7 @@ impl MixFetchClientBuilder { invalidated: AtomicBool::new(false), mix_fetch_config: self.config.mix_fetch, self_address, - client_input, + client_input: Arc::new(RwLock::new(client_input)), requests: active_requests, // this cannot failed as we haven't passed an external task manager _task_manager: Mutex::new(started_client.task_handle.try_into_task_manager().unwrap()), @@ -208,6 +211,8 @@ impl MixFetchClient { // the expect here is fine as it implies an unrecoverable failure since one of the client core // tasks has terminated self.client_input + .write() + .await .input_sender .send(input) .await @@ -235,6 +240,8 @@ impl MixFetchClient { // the expect here is fine as it implies an unrecoverable failure since one of the client core // tasks has terminated self.client_input + .write() + .await .input_sender .send(input) .await