Skip to content
Closed
Show file tree
Hide file tree
Changes from 13 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 10 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

6 changes: 5 additions & 1 deletion common/client-core/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ license.workspace = true

[dependencies]
async-trait = { workspace = true }
bincode = { workspace = true }
base64 = "0.21.2"
bs58 = { workspace = true }
cfg-if = { workspace = true }
Expand All @@ -26,6 +27,7 @@ tap = { workspace = true }
thiserror = { workspace = true }
url = { workspace = true, features = ["serde"] }
tokio = { workspace = true, features = ["macros"] }
tokio-util = { workspace = true, features = ["codec"] }
time = { workspace = true }
zeroize = { workspace = true }

Expand All @@ -47,7 +49,9 @@ nym-validator-client = { path = "../client-libs/validator-client", default-featu
nym-task = { path = "../task" }
nym-credential-storage = { path = "../credential-storage" }
nym-network-defaults = { path = "../network-defaults" }
nym-client-core-config-types = { path = "./config-types", features = ["disk-persistence"] }
nym-client-core-config-types = { path = "./config-types", features = [
"disk-persistence",
] }
nym-client-core-surb-storage = { path = "./surb-storage" }
nym-client-core-gateways-storage = { path = "./gateways-storage" }

Expand Down
9 changes: 4 additions & 5 deletions common/client-core/src/client/base_client/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ use crate::init::{
};
use crate::{config, spawn_future};
use futures::channel::mpsc;
use futures::SinkExt;
use log::{debug, error, info, warn};
use nym_bandwidth_controller::BandwidthController;
use nym_client_core_gateways_storage::{GatewayDetails, GatewaysDetailsStore};
Expand All @@ -59,6 +60,7 @@ use std::fmt::Debug;
use std::os::raw::c_int as RawFd;
use std::path::Path;
use std::sync::Arc;
use tokio_util::sync::{PollSendError, PollSender};
use url::Url;

#[cfg(all(
Expand All @@ -78,10 +80,7 @@ pub struct ClientInput {
}

impl ClientInput {
pub async fn send(
&self,
message: InputMessage,
) -> Result<(), tokio::sync::mpsc::error::SendError<InputMessage>> {
pub async fn send(&mut self, message: InputMessage) -> Result<(), PollSendError<InputMessage>> {
self.input_sender.send(message).await
}
}
Expand Down Expand Up @@ -804,7 +803,7 @@ where
client_input: ClientInputStatus::AwaitingProducer {
client_input: ClientInput {
connection_command_sender: client_connection_tx,
input_sender,
input_sender: PollSender::new(input_sender),
},
},
client_output: ClientOutputStatus::AwaitingConsumer {
Expand Down
63 changes: 60 additions & 3 deletions common/client-core/src/client/inbound_messages.rs
Original file line number Diff line number Diff line change
@@ -1,16 +1,23 @@
// Copyright 2020-2023 - Nym Technologies SA <[email protected]>
// SPDX-License-Identifier: Apache-2.0

use nym_sphinx::addressing::clients::Recipient;
use nym_sphinx::anonymous_replies::requests::AnonymousSenderTag;
use nym_sphinx::forwarding::packet::MixPacket;
use nym_sphinx::params::PacketType;
use nym_task::connections::TransmissionLane;
use serde::{Deserialize, Serialize};
use tokio_util::{
bytes::Buf,
bytes::BytesMut,
codec::{Decoder, Encoder},
};

use crate::error::ClientCoreError;

pub type InputMessageSender = tokio::sync::mpsc::Sender<InputMessage>;
pub type InputMessageSender = tokio_util::sync::PollSender<InputMessage>;
pub type InputMessageReceiver = tokio::sync::mpsc::Receiver<InputMessage>;

#[derive(Debug)]
#[derive(Serialize, Deserialize, Debug)]
pub enum InputMessage {
/// Fire an already prepared mix packets into the network.
/// No guarantees are made about it. For example no retransmssion
Expand Down Expand Up @@ -64,6 +71,10 @@ pub enum InputMessage {
}

impl InputMessage {
pub fn simple(data: &[u8], recipient: Recipient) -> Self {
InputMessage::new_regular(recipient, data.to_vec(), TransmissionLane::General, None)
}

pub fn new_premade(
msgs: Vec<MixPacket>,
lane: TransmissionLane,
Expand Down Expand Up @@ -197,4 +208,50 @@ impl InputMessage {
InputMessage::MessageWrapper { message, .. } => message.lane(),
}
}

pub fn serialized_size(&self) -> u64 {
bincode::serialized_size(self).expect("failed to get serialized InputMessage size") + 4
}
}

// TODO: Tests
pub struct InputMessageCodec;

impl Encoder<InputMessage> for InputMessageCodec {
type Error = ClientCoreError;

fn encode(&mut self, item: InputMessage, buf: &mut BytesMut) -> Result<(), Self::Error> {
let encoded = bincode::serialize(&item).expect("failed to serialize InputMessage");
let encoded_len = encoded.len() as u32;
let mut encoded_with_len = encoded_len.to_le_bytes().to_vec();
encoded_with_len.extend(encoded);
buf.reserve(encoded_with_len.len());
buf.extend_from_slice(&encoded_with_len);
Ok(())
}
}

impl Decoder for InputMessageCodec {
type Item = InputMessage;
type Error = ClientCoreError;

fn decode(&mut self, buf: &mut BytesMut) -> Result<Option<Self::Item>, Self::Error> {
if buf.len() < 4 {
return Ok(None);
}

let len = u32::from_le_bytes([buf[0], buf[1], buf[2], buf[3]]) as usize;
if buf.len() < len + 4 {
return Ok(None);
}

let decoded = match bincode::deserialize(&buf[4..len + 4]) {
Ok(decoded) => decoded,
Err(_) => return Ok(None),
};

buf.advance(len + 4);

Ok(Some(decoded))
}
}
19 changes: 16 additions & 3 deletions common/nymsphinx/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,9 @@ log = { workspace = true }
rand = { workspace = true }
rand_distr = { workspace = true }
thiserror = { workspace = true }
serde = { workspace = true }
bincode = { workspace = true }
tokio-util = { workspace = true, features = ["codec"] }

nym-sphinx-acknowledgements = { path = "acknowledgements" }
nym-sphinx-addressing = { path = "addressing" }
Expand All @@ -30,7 +33,9 @@ nym-topology = { path = "../topology" }

[dev-dependencies]
nym-mixnet-contract-common = { path = "../cosmwasm-smart-contracts/mixnet-contract" }
nym-crypto = { path = "../crypto", version = "0.4.0", features = ["asymmetric"] }
nym-crypto = { path = "../crypto", version = "0.4.0", features = [
"asymmetric",
] }

# do not include this when compiling into wasm as it somehow when combined together with reqwest, it will require
# net2 via tokio-util -> tokio -> mio -> net2
Expand All @@ -43,5 +48,13 @@ features = ["sync"]

[features]
default = ["sphinx"]
sphinx = ["nym-crypto/sphinx", "nym-sphinx-params/sphinx", "nym-sphinx-types/sphinx"]
outfox = ["nym-crypto/outfox", "nym-sphinx-params/outfox", "nym-sphinx-types/outfox"]
sphinx = [
"nym-crypto/sphinx",
"nym-sphinx-params/sphinx",
"nym-sphinx-types/sphinx",
]
outfox = [
"nym-crypto/outfox",
"nym-sphinx-params/outfox",
"nym-sphinx-types/outfox",
]
3 changes: 2 additions & 1 deletion common/nymsphinx/anonymous-replies/src/requests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
use crate::{ReplySurb, ReplySurbError};
use nym_sphinx_addressing::clients::{Recipient, RecipientFormattingError};
use rand::{CryptoRng, RngCore};
use serde::{Deserialize, Serialize};
use std::fmt::{Display, Formatter};
use std::mem;
use thiserror::Error;
Expand All @@ -24,7 +25,7 @@ pub enum InvalidAnonymousSenderTagRepresentation {
InvalidLength { received: usize, expected: usize },
}

#[derive(Debug, Copy, Clone, Eq, PartialEq, Hash)]
#[derive(Debug, Copy, Clone, Eq, PartialEq, Hash, Serialize, Deserialize)]
#[cfg_attr(target_arch = "wasm32", wasm_bindgen)]
pub struct AnonymousSenderTag([u8; SENDER_TAG_SIZE]);

Expand Down
1 change: 1 addition & 0 deletions common/nymsphinx/forwarding/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -13,3 +13,4 @@ nym-sphinx-params = { path = "../params" }
nym-sphinx-types = { path = "../types", features = ["sphinx", "outfox"] }
nym-outfox = { path = "../../../nym-outfox" }
thiserror = { workspace = true }
serde = { workspace = true }
37 changes: 37 additions & 0 deletions common/nymsphinx/forwarding/src/packet.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,10 @@
use nym_sphinx_addressing::nodes::{NymNodeRoutingAddress, NymNodeRoutingAddressError};
use nym_sphinx_params::{PacketSize, PacketType};
use nym_sphinx_types::{NymPacket, NymPacketError};
use serde::{
de::{self, Visitor},
Deserialize, Deserializer, Serialize, Serializer,
};

use std::fmt::{self, Debug, Formatter};
use thiserror::Error;
Expand Down Expand Up @@ -110,6 +114,39 @@ impl MixPacket {
.chain(self.packet.to_bytes()?)
.collect())
}

pub fn to_bytes(&self) -> Result<Vec<u8>, MixPacketFormattingError> {
Ok(std::iter::once(self.packet_type as u8)
.chain(self.next_hop.as_bytes())
.chain(self.packet.to_bytes()?)
.collect())
}
}

impl Serialize for MixPacket {
fn serialize<S: Serializer>(&self, serializer: S) -> Result<S::Ok, S::Error> {
serializer.serialize_bytes(&self.to_bytes().map_err(serde::ser::Error::custom)?)
}
}

struct MixPacketVisitor;

impl<'de> Visitor<'de> for MixPacketVisitor {
type Value = MixPacket;

fn expecting(&self, formatter: &mut fmt::Formatter) -> fmt::Result {
formatter.write_str("a byte array representing a mix packet")
}

fn visit_bytes<E: de::Error>(self, v: &[u8]) -> Result<Self::Value, E> {
MixPacket::try_from_bytes(v).map_err(serde::de::Error::custom)
}
}

impl<'de> Deserialize<'de> for MixPacket {
fn deserialize<D: Deserializer<'de>>(deserializer: D) -> Result<Self, D::Error> {
deserializer.deserialize_bytes(MixPacketVisitor)
}
}

// TODO: test for serialization and errors!
Loading