diff --git a/Cargo.lock b/Cargo.lock index 26ecc4a3b98..5b103f7ad20 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -6521,7 +6521,6 @@ dependencies = [ "nym-validator-client", "nym-wireguard", "once_cell", - "pretty_env_logger", "rand 0.7.3", "rand 0.8.5", "serde", @@ -6723,7 +6722,6 @@ dependencies = [ "nym-types", "nym-validator-client", "opentelemetry", - "pretty_env_logger", "rand 0.7.3", "serde", "serde_json", @@ -6748,6 +6746,7 @@ dependencies = [ "log", "nym-bin-common", "nym-crypto", + "nym-mixnet-contract-common", "nym-network-defaults", "nym-sphinx-acknowledgements", "nym-sphinx-addressing", @@ -6757,6 +6756,7 @@ dependencies = [ "nym-sphinx-types", "nym-task", "nym-validator-client", + "parking_lot 0.12.1", "rand 0.8.5", "serde", "thiserror", diff --git a/Cargo.toml b/Cargo.toml index 4d786023d4f..154b4327900 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -162,8 +162,10 @@ serde_json = "1.0.91" tap = "1.0.1" time = "0.3.30" thiserror = "1.0.48" -tokio = "1.24.1" +tokio = "1.33.0" +tokio-stream = "0.1.14" tokio-tungstenite = "0.20.1" +tokio-util = "0.7.9" tracing = "0.1.37" tungstenite = { version = "0.20.1", default-features = false } ts-rs = "7.0.0" diff --git a/common/client-core/Cargo.toml b/common/client-core/Cargo.toml index ccd2113a284..10fef9db221 100644 --- a/common/client-core/Cargo.toml +++ b/common/client-core/Cargo.toml @@ -47,7 +47,7 @@ nym-credential-storage = { path = "../credential-storage" } nym-network-defaults = { path = "../network-defaults" } [target."cfg(not(target_arch = \"wasm32\"))".dependencies.tokio-stream] -version = "0.1.11" +workspace = true features = ["time"] [target."cfg(not(target_arch = \"wasm32\"))".dependencies.tokio] @@ -55,7 +55,7 @@ workspace = true features = ["time"] [target."cfg(not(target_arch = \"wasm32\"))".dependencies.tokio-tungstenite] -version = "0.20.1" +workspace = true [target."cfg(not(target_arch = \"wasm32\"))".dependencies.sqlx] version = "0.6.2" diff --git a/common/client-libs/gateway-client/Cargo.toml b/common/client-libs/gateway-client/Cargo.toml index 92b1134512b..d6497fe624e 100644 --- a/common/client-libs/gateway-client/Cargo.toml +++ b/common/client-libs/gateway-client/Cargo.toml @@ -40,7 +40,7 @@ workspace = true features = ["macros", "rt", "net", "sync", "time"] [target."cfg(not(target_arch = \"wasm32\"))".dependencies.tokio-stream] -version = "0.1.11" +workspace = true features = ["net", "sync", "time"] [target."cfg(not(target_arch = \"wasm32\"))".dependencies.tokio-tungstenite] diff --git a/common/client-libs/mixnet-client/Cargo.toml b/common/client-libs/mixnet-client/Cargo.toml index 9eede069da8..5b4e4d06a12 100644 --- a/common/client-libs/mixnet-client/Cargo.toml +++ b/common/client-libs/mixnet-client/Cargo.toml @@ -10,7 +10,7 @@ edition = "2021" futures = { workspace = true } log = { workspace = true } tokio = { version = "1.24.1", features = ["time", "net", "rt"] } -tokio-util = { version = "0.7.4", features = ["codec"] } +tokio-util = { workspace = true, features = ["codec"] } # internal nym-sphinx = { path = "../../nymsphinx" } diff --git a/common/client-libs/mixnet-client/src/client.rs b/common/client-libs/mixnet-client/src/client.rs index 4da308144f1..15639508a2a 100644 --- a/common/client-libs/mixnet-client/src/client.rs +++ b/common/client-libs/mixnet-client/src/client.rs @@ -123,13 +123,10 @@ impl Client { // We could have as well used conn.send_all(receiver.map(Ok)), but considering we don't care // about neither receiver nor the connection, it doesn't matter which one gets consumed if let Err(err) = receiver.map(Ok).forward(conn).await { - warn!("Failed to forward packets to {} - {err}", address); + warn!("Failed to forward packets to {address} - {err}"); } - debug!( - "connection manager to {} is finished. Either the connection failed or mixnet client got dropped", - address - ); + debug!("connection manager to {address} is finished. Either the connection failed or mixnet client got dropped"); } /// If we're trying to reconnect, determine how long we should wait. @@ -207,7 +204,7 @@ impl SendWithoutResponse for Client { if let Some(sender) = self.conn_new.get_mut(&address) { if let Err(err) = sender.channel.try_send(framed_packet) { if err.is_full() { - debug!("Connection to {} seems to not be able to handle all the traffic - dropping the current packet", address); + debug!("Connection to {address} seems to not be able to handle all the traffic - dropping the current packet"); // it's not a 'big' error, but we did not manage to send the packet // if the queue is full, we can't really do anything but to drop the packet Err(io::Error::new( @@ -215,10 +212,8 @@ impl SendWithoutResponse for Client { "connection queue is full", )) } else if err.is_disconnected() { - debug!( - "Connection to {} seems to be dead. attempting to re-establish it...", - address - ); + debug!("Connection to {address} seems to be dead. attempting to re-establish it..."); + // it's not a 'big' error, but we did not manage to send the packet, but queue // it up to send it as soon as the connection is re-established self.make_connection(address, err.into_inner()); @@ -238,7 +233,7 @@ impl SendWithoutResponse for Client { } } else { // there was never a connection to begin with - debug!("establishing initial connection to {}", address); + debug!("establishing initial connection to {address}"); // it's not a 'big' error, but we did not manage to send the packet, but queue the packet // for sending for as soon as the connection is created self.make_connection(address, framed_packet); diff --git a/common/client-libs/validator-client/src/nyxd/mod.rs b/common/client-libs/validator-client/src/nyxd/mod.rs index c9c27370e26..f1733e095ee 100644 --- a/common/client-libs/validator-client/src/nyxd/mod.rs +++ b/common/client-libs/validator-client/src/nyxd/mod.rs @@ -55,6 +55,9 @@ pub use tendermint_rpc::{ }; pub use tendermint_rpc::{Request, Response, SimpleRequest}; +#[cfg(feature = "http-client")] +pub use tendermint_rpc::Url as RpcUrl; + #[cfg(feature = "http-client")] use crate::http_client; #[cfg(feature = "http-client")] diff --git a/common/cosmwasm-smart-contracts/mixnet-contract/src/mixnode.rs b/common/cosmwasm-smart-contracts/mixnet-contract/src/mixnode.rs index a09d21dd2d2..5d4d96e34e8 100644 --- a/common/cosmwasm-smart-contracts/mixnet-contract/src/mixnode.rs +++ b/common/cosmwasm-smart-contracts/mixnet-contract/src/mixnode.rs @@ -623,6 +623,24 @@ pub enum Layer { Three = 3, } +impl Layer { + pub fn try_next(&self) -> Option { + match self { + Layer::One => Some(Layer::Two), + Layer::Two => Some(Layer::Three), + Layer::Three => None, + } + } + + pub fn try_previous(&self) -> Option { + match self { + Layer::One => None, + Layer::Two => Some(Layer::One), + Layer::Three => Some(Layer::Two), + } + } +} + impl From for String { fn from(layer: Layer) -> Self { (layer as u8).to_string() diff --git a/common/mixnode-common/Cargo.toml b/common/mixnode-common/Cargo.toml index b2fcf6e2a36..2ecd5c659e7 100644 --- a/common/mixnode-common/Cargo.toml +++ b/common/mixnode-common/Cargo.toml @@ -12,20 +12,21 @@ futures = { workspace = true } humantime-serde = "1.0" log = { workspace = true } rand = "0.8" -serde = { version = "1.0", features = ["derive"] } -tokio = { version = "1.24.1", features = [ +serde = { workspace = true, features = ["derive"] } +tokio = { workspace = true, features = [ "time", "macros", "rt", "net", "io-util", ] } -tokio-util = { version = "0.7.4", features = ["codec"] } +tokio-util = { workspace = true, features = ["codec"] } url = { workspace = true } thiserror = { workspace = true } +parking_lot = { workspace = true } ## tracing -tracing = { version = "0.1.37", optional = true } +tracing = { workspace = true, optional = true } nym-crypto = { path = "../crypto" } nym-network-defaults = { path = "../network-defaults" } @@ -37,6 +38,7 @@ nym-sphinx-params = { path = "../nymsphinx/params" } nym-sphinx-types = { path = "../nymsphinx/types" } nym-task = { path = "../task" } nym-validator-client = { path = "../client-libs/validator-client" } +nym-mixnet-contract-common = { path = "../cosmwasm-smart-contracts/mixnet-contract" } nym-bin-common = { path = "../bin-common" } cfg-if = "1.0.0" diff --git a/common/mixnode-common/src/forward_travel/error.rs b/common/mixnode-common/src/forward_travel/error.rs new file mode 100644 index 00000000000..86ef9ac2d9e --- /dev/null +++ b/common/mixnode-common/src/forward_travel/error.rs @@ -0,0 +1,27 @@ +// Copyright 2023 - Nym Technologies SA +// SPDX-License-Identifier: Apache-2.0 + +use nym_validator_client::nyxd::error::NyxdError; +use std::net::IpAddr; +use thiserror::Error; + +#[derive(Debug, Error)] +pub enum ForwardTravelError { + #[error("received a connection request from a forbidden address: '{address}'")] + DisallowedIngressAddress { address: IpAddr }, + + #[error("received a request to open connection to a forbidden address: '{address}'")] + DisallowedEgressAddress { address: IpAddr }, + + #[error("no valid nyxd urls are available for topology queries")] + NoNyxdUrlsAvailable, + + #[error("nyxd interaction failure: {source}")] + NyxdFailure { + #[from] + source: NyxdError, + }, + + #[error("the current epoch appears to be stuck")] + StuckEpoch, +} diff --git a/common/mixnode-common/src/forward_travel/mod.rs b/common/mixnode-common/src/forward_travel/mod.rs new file mode 100644 index 00000000000..52b6c7de4a0 --- /dev/null +++ b/common/mixnode-common/src/forward_travel/mod.rs @@ -0,0 +1,356 @@ +// Copyright 2023 - Nym Technologies SA +// SPDX-License-Identifier: Apache-2.0 + +use crate::forward_travel::error::ForwardTravelError; +use log::{debug, error, info, trace, warn}; +use nym_mixnet_contract_common::{EpochId, GatewayBond, Layer, MixNodeBond}; +use nym_network_defaults::NymNetworkDetails; +use nym_task::TaskClient; +use nym_validator_client::client::IdentityKey; +use nym_validator_client::nyxd::contract_traits::{MixnetQueryClient, PagedMixnetQueryClient}; +use nym_validator_client::{nyxd, QueryHttpRpcNyxdClient}; +use parking_lot::RwLock; +use rand::seq::SliceRandom; +use rand::{thread_rng, Rng}; +use std::collections::HashSet; +use std::mem; +use std::net::{IpAddr, ToSocketAddrs}; +use std::str::FromStr; +use std::sync::Arc; +use std::time::Duration; +use tokio::time::sleep; +use url::Url; + +pub mod error; + +// TODO: to allow for separate ingress/egress we have to change our layer selection algorithm, i.e. it has to be announced before epoch change +pub type AllowedIngress = AllowedPaths; +pub type AllowedEgress = AllowedPaths; + +pub struct AllowedAddressesProvider { + current_epoch: EpochId, + + identity: IdentityKey, + + client_config: nyxd::Config, + + /// URLs to the nyxd validators for obtaining unfiltered network topology. + nyxd_endpoints: Vec, + + // to allow for separate ingress/egress we have to change our layer selection algorithm, i.e. it has to be announced before epoch change + // ingress: AllowedIngress, + // egress: AllowedEgress, + allowed: AllowedPaths, +} + +#[allow(dead_code)] +impl AllowedAddressesProvider { + pub async fn new( + identity: IdentityKey, + nyxd_endpoints: Vec, + allow_all: bool, + network_details: Option, + ) -> Result { + let network = network_details.unwrap_or(NymNetworkDetails::new_mainnet()); + let mut provider = AllowedAddressesProvider { + current_epoch: 0, + identity, + client_config: nyxd::Config::try_from_nym_network_details(&network)?, + nyxd_endpoints, + allowed: AllowedPaths::new(allow_all), + }; + + if !allow_all { + // set initial values for ingress/egress + let client = provider.ephemeral_nyxd_client()?; + provider.update_state(client).await?; + } + + Ok(provider) + } + + fn ephemeral_nyxd_client(&self) -> Result { + let mut possible_nyxd_endpoints = self.nyxd_endpoints.clone(); + possible_nyxd_endpoints.shuffle(&mut thread_rng()); + + let mut last_error = match QueryHttpRpcNyxdClient::connect( + self.client_config.clone(), + possible_nyxd_endpoints + .pop() + .ok_or(ForwardTravelError::NoNyxdUrlsAvailable)? + .as_str(), + ) { + Ok(client) => return Ok(client), + Err(err) => err, + }; + + for url in possible_nyxd_endpoints { + match QueryHttpRpcNyxdClient::connect(self.client_config.clone(), url.as_str()) { + Ok(client) => return Ok(client), + Err(err) => last_error = err, + }; + } + + Err(last_error.into()) + } + + pub fn ingress(&self) -> AllowedIngress { + self.allowed.clone() + } + + pub fn egress(&self) -> AllowedEgress { + self.allowed.clone() + } + + fn add_node_ips(raw_host: &str, identity: &str, set: &mut HashSet) { + if let Ok(ip) = IpAddr::from_str(raw_host) { + set.insert(ip); + } else { + // this might still be a valid hostname + // + // annoyingly there exists a method of looking up a socket address but not an ip address, + // so append any port and perform the lookup + let Ok(sockets) = format!("{raw_host}:1789").to_socket_addrs() else { + warn!("failed to resolve ip address of node '{identity}' (hostname: '{raw_host}')"); + return; + }; + + for socket in sockets { + set.insert(socket.ip()); + } + } + } + + fn get_all_addresses(nodes: &[MixNodeBond], gateways: &[GatewayBond]) -> HashSet { + let mut allowed = HashSet::new(); + + for node in nodes.iter() { + Self::add_node_ips(&node.mix_node.host, node.identity(), &mut allowed); + } + + for gateway in gateways.iter() { + Self::add_node_ips(&gateway.gateway.host, gateway.identity(), &mut allowed); + } + + allowed + } + + /// Gets ip addresses of all mixnodes on given layer + fn get_addresses_on_layer(layer: Layer, nodes: &[MixNodeBond]) -> HashSet { + let mut allowed = HashSet::new(); + + for node in nodes.iter().filter(|m| m.layer == layer) { + Self::add_node_ips(&node.mix_node.host, node.identity(), &mut allowed); + } + + allowed + } + + fn gateway_addresses(gateways: &[GatewayBond]) -> HashSet { + let mut allowed = HashSet::new(); + + for gateway in gateways.iter() { + Self::add_node_ips(&gateway.gateway.host, gateway.identity(), &mut allowed); + } + + allowed + } + + fn locate_layer(&self, nodes: &[MixNodeBond]) -> Option { + nodes + .iter() + .find(|m| m.identity() == self.identity) + .map(|m| m.layer) + } + + fn is_gateway(&self, gateways: &[GatewayBond]) -> bool { + gateways + .iter() + .any(|g| g.gateway.identity_key == self.identity) + } + + async fn update_state( + &mut self, + client: QueryHttpRpcNyxdClient, + ) -> Result<(), ForwardTravelError> { + let current_interval = client.get_current_interval_details().await?; + let current_epoch = current_interval.interval.current_epoch_absolute_id(); + + if current_epoch == self.current_epoch { + error!("can't update the allowed ips list as the epoch appears to be stuck"); + return Err(ForwardTravelError::StuckEpoch); + } + + let has_epoch_deviated = current_epoch > self.current_epoch + 1; + + let mixnodes = client.get_all_mixnode_bonds().await?; + let gateways = client.get_all_gateways().await?; + + let new_allowed = Self::get_all_addresses(&mixnodes, &gateways); + + // I'm leaving this code commented out to preserve this logic for when we need it + // to update to proper ingress/egress filtering + + // let our_mix_layer = self.locate_layer(&mixnodes); + // + // let previous_mix_layer = our_mix_layer.and_then(|l| l.try_previous()); + // let next_mix_layer = our_mix_layer.and_then(|l| l.try_next()); + // + // let (allowed_ingress, allowed_egress) = match (previous_mix_layer, next_mix_layer) { + // // layer 1 + // (None, Some(next)) => { + // let gateways = client.get_all_gateways().await?; + // + // ( + // Self::gateway_addresses(&gateways), + // Self::get_addresses_on_layer(next, &mixnodes), + // ) + // } + // // layer 2 + // (Some(previous), Some(next)) => ( + // Self::get_addresses_on_layer(previous, &mixnodes), + // Self::get_addresses_on_layer(next, &mixnodes), + // ), + // // layer 3 + // (Some(previous), None) => { + // let gateways = client.get_all_gateways().await?; + // ( + // Self::get_addresses_on_layer(previous, &mixnodes), + // Self::gateway_addresses(&gateways), + // ) + // } + // // gateway (or not bonded) + // (None, None) => { + // let gateways = client.get_all_gateways().await?; + // + // if self.is_gateway(&gateways) { + // let mut base_ingress = Self::get_addresses_on_layer(Layer::Three, &mixnodes); + // let mut base_egress = Self::get_addresses_on_layer(Layer::One, &mixnodes); + // + // // TODO: this extension should be conditional on whether the node is running the vpn module + // let gw_extension = Self::gateway_addresses(&gateways); + // + // base_ingress.extend(gw_extension.clone()); + // base_egress.extend(gw_extension.clone()); + // + // (base_ingress, base_egress) + // } else { + // warn!("our node doesn't appear to be bonded - going to permit traffic from ALL mixnodes and gateways"); + // let all = Self::get_all_addresses(&mixnodes, &gateways); + // (all.clone(), all) + // } + // } + // }; + + self.current_epoch = current_epoch; + self.allowed.advance_epoch(new_allowed, has_epoch_deviated); + + Ok(()) + } + + async fn update_allowed_addresses(&mut self) -> Result<(), ForwardTravelError> { + // create new client every epoch because it results in different nyxd endpoint being used + // what may help in distributing the load + let client = self.ephemeral_nyxd_client()?; + self.wait_for_epoch_rollover(&client).await?; + self.update_state(client).await + } + + async fn wait_for_epoch_rollover( + &self, + client: &QueryHttpRpcNyxdClient, + ) -> Result<(), ForwardTravelError> { + let current_interval = client.get_current_interval_details().await?; + let current_epoch = current_interval.interval.current_epoch_absolute_id(); + + if current_epoch <= self.current_epoch { + let remaining = current_interval.time_until_current_epoch_end(); + // add few more seconds to account for block time drift and to spread queries of all + // other nodes + let adjustment_secs = rand::thread_rng().gen_range(5..90); + sleep(remaining + Duration::from_secs(adjustment_secs)).await; + } + + Ok(()) + } + + pub async fn run(&mut self, mut task_client: TaskClient) { + if self.allowed.allow_all { + // debug_assert!(self.egress.allow_all); + + info!("the forward travel is currently disabled - there's no point in starting the route refresher"); + task_client.mark_as_success(); + return; + } + + debug!("Started ValidAddressesProvider with graceful shutdown support"); + while !task_client.is_shutdown() { + tokio::select! { + biased; + _ = task_client.recv() => { + trace!("ValidAddressesProvider: Received shutdown"); + } + res = self.update_allowed_addresses() => { + if let Err(err) = res { + warn!("failed to update the allowed addresses: {err}"); + + // don't retry immediately in case it was a network failure, wait a bit instead. + task_client.wait(Duration::from_secs(5 * 60)).await + } + } + } + } + task_client.recv_timeout().await; + log::debug!("ValidAddressesProvider: Exiting"); + } +} + +#[derive(Clone)] +pub struct AllowedPaths { + // this is fine that this value is not wrapped in an Arc and is not atomic given + // it's not expected to be modified at runtime + allow_all: bool, + inner: Arc>, +} + +impl AllowedPaths { + fn new(allow_all: bool) -> Self { + AllowedPaths { + allow_all, + inner: Arc::new(RwLock::new(AllowedPathsInner { + previous_epoch: HashSet::new(), + current_epoch: HashSet::new(), + })), + } + } + + pub fn is_allowed(&self, address: IpAddr) -> bool { + if self.allow_all { + return true; + } + + let guard = self.inner.read(); + guard.current_epoch.contains(&address) || guard.previous_epoch.contains(&address) + } + + fn advance_epoch(&self, current_epoch: HashSet, reset_previous: bool) { + // if this is triggered, it's an implementation bug; + // we shouldn't be updating data if we're allowing everything regardless + debug_assert!(!self.allow_all); + + let mut guard = self.inner.write(); + + let old_current = mem::replace(&mut guard.current_epoch, current_epoch); + + if reset_previous { + guard.previous_epoch = HashSet::new() + } else { + guard.previous_epoch = old_current; + } + } +} + +struct AllowedPathsInner { + previous_epoch: HashSet, + current_epoch: HashSet, +} diff --git a/common/mixnode-common/src/lib.rs b/common/mixnode-common/src/lib.rs index 9689b157a53..f781400b357 100644 --- a/common/mixnode-common/src/lib.rs +++ b/common/mixnode-common/src/lib.rs @@ -1,5 +1,7 @@ -// Copyright 2021 - Nym Technologies SA +// Copyright 2021-2023 - Nym Technologies SA // SPDX-License-Identifier: Apache-2.0 + +pub mod forward_travel; pub mod packet_processor; pub mod verloc; diff --git a/common/mixnode-common/src/verloc/error.rs b/common/mixnode-common/src/verloc/error.rs index 4bfe0da48e0..1e39916b52c 100644 --- a/common/mixnode-common/src/verloc/error.rs +++ b/common/mixnode-common/src/verloc/error.rs @@ -1,81 +1,50 @@ -// Copyright 2021 - Nym Technologies SA +// Copyright 2021-2023 - Nym Technologies SA // SPDX-License-Identifier: Apache-2.0 -use std::fmt::{self, Display, Formatter}; use std::io; +use thiserror::Error; -#[derive(Debug)] +#[derive(Debug, Error)] pub enum RttError { + #[error("the received echo packet had unexpected size")] UnexpectedEchoPacketSize, + + #[error("the received reply packet had unexpected size")] UnexpectedReplyPacketSize, + #[error("the received echo packet had malformed sender")] MalformedSenderIdentity, + #[error("the received echo packet had malformed signature")] MalformedEchoSignature, + + #[error("the received reply packet had malformed signature")] MalformedReplySignature, + #[error("the received echo packet had invalid signature")] InvalidEchoSignature, + + #[error("the received reply packet had invalid signature")] InvalidReplySignature, - UnreachableNode(String, io::Error), - UnexpectedConnectionFailureWrite(String, io::Error), - UnexpectedConnectionFailureRead(String, io::Error), + #[error("could not establish connection to {0}: {1}")] + UnreachableNode(String, #[source] io::Error), + + #[error("failed to write echo packet to {0}: {1}")] + UnexpectedConnectionFailureWrite(String, #[source] io::Error), + + #[error("failed to read reply packet from {0}: {1}")] + UnexpectedConnectionFailureRead(String, #[source] io::Error), + + #[error("timed out while trying to read reply packet from {0}")] ConnectionReadTimeout(String), + + #[error("timed out while trying to write echo packet to {0}")] ConnectionWriteTimeout(String), + #[error("the received reply packet had an unexpected sequence number")] UnexpectedReplySequence, + #[error("shutdown signal received")] ShutdownReceived, } - -impl Display for RttError { - fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result { - match self { - RttError::UnexpectedEchoPacketSize => { - write!(f, "The received echo packet had unexpected size") - } - RttError::UnexpectedReplyPacketSize => { - write!(f, "The received reply packet had unexpected size") - } - RttError::MalformedSenderIdentity => { - write!(f, "The received echo packet had malformed sender") - } - RttError::MalformedEchoSignature => { - write!(f, "The received echo packet had malformed signature") - } - RttError::MalformedReplySignature => { - write!(f, "The received reply packet had malformed signature") - } - RttError::InvalidEchoSignature => { - write!(f, "The received echo packet had invalid signature") - } - RttError::InvalidReplySignature => { - write!(f, "The received reply packet had invalid signature") - } - RttError::UnreachableNode(id, err) => { - write!(f, "Could not establish connection to {id} - {err}") - } - RttError::UnexpectedConnectionFailureWrite(id, err) => { - write!(f, "Failed to write echo packet to {id} - {err}") - } - RttError::UnexpectedConnectionFailureRead(id, err) => { - write!(f, "Failed to read reply packet from {id} - {err}") - } - RttError::ConnectionReadTimeout(id) => { - write!(f, "Timed out while trying to read reply packet from {id}") - } - RttError::ConnectionWriteTimeout(id) => { - write!(f, "Timed out while trying to write echo packet to {id}") - } - RttError::UnexpectedReplySequence => write!( - f, - "The received reply packet had an unexpected sequence number" - ), - RttError::ShutdownReceived => { - write!(f, "Shutdown signal received") - } - } - } -} - -impl std::error::Error for RttError {} diff --git a/common/nonexhaustive-delayqueue/Cargo.toml b/common/nonexhaustive-delayqueue/Cargo.toml index f79284dd502..04e1e5e5337 100644 --- a/common/nonexhaustive-delayqueue/Cargo.toml +++ b/common/nonexhaustive-delayqueue/Cargo.toml @@ -7,13 +7,13 @@ edition = "2021" # See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html [dependencies] -tokio-stream = "0.1.11" # this one seems to be a thing until `Stream` trait is stabilised in stdlib +tokio-stream = { workspace = true } # this one seems to be a thing until `Stream` trait is stabilised in stdlib [target."cfg(not(target_arch = \"wasm32\"))".dependencies.tokio] workspace = true [target."cfg(not(target_arch = \"wasm32\"))".dependencies.tokio-util] -version = "0.7.4" +workspace = true features = ["time"] [target."cfg(target_arch = \"wasm32\")".dependencies.wasmtimer] diff --git a/common/nymsphinx/framing/Cargo.toml b/common/nymsphinx/framing/Cargo.toml index bf2c030e2e9..b62de91f531 100644 --- a/common/nymsphinx/framing/Cargo.toml +++ b/common/nymsphinx/framing/Cargo.toml @@ -9,7 +9,7 @@ repository = { workspace = true } [dependencies] bytes = "1.0" -tokio-util = { version = "0.7.4", features = ["codec"] } +tokio-util = { workspace = true, features = ["codec"] } thiserror = { workspace = true } nym-sphinx-types = { path = "../types", features = ["sphinx", "outfox"] } diff --git a/common/socks5/proxy-helpers/Cargo.toml b/common/socks5/proxy-helpers/Cargo.toml index 6ebaaa03c9c..c67f6a30292 100644 --- a/common/socks5/proxy-helpers/Cargo.toml +++ b/common/socks5/proxy-helpers/Cargo.toml @@ -9,7 +9,7 @@ edition = "2021" [dependencies] bytes = "1.0" tokio = { version = "1.24.1", features = [ "net", "io-util", "sync", "macros", "time", "rt-multi-thread" ] } -tokio-util = { version = "0.7.4", features = [ "io" ] } # reason for getting this guy is to to able to port to tokio 1.X more quickly by being able to use +tokio-util = { workspace = true, features = [ "io" ] } # reason for getting this guy is to to able to port to tokio 1.X more quickly by being able to use # their `read_buf` [from the util crate] replacement rather than having to rethink/reimplement `AvailableReader` with the new AsyncRead trait definition. # In the long run, the dependency should probably get removed in favour of pure-tokio implementation, but for time being it's fine. futures = { workspace = true } diff --git a/common/task/src/manager.rs b/common/task/src/manager.rs index c7eed1df2ac..1c8f00fec3e 100644 --- a/common/task/src/manager.rs +++ b/common/task/src/manager.rs @@ -373,6 +373,14 @@ impl TaskClient { } } + pub async fn wait(&mut self, duration: Duration) { + tokio::select! { + biased; + _ = self.recv() => (), + _ = sleep(duration) => (), + } + } + // Create a dummy that will never report that we should shutdown. pub fn dummy() -> TaskClient { let (_notify_tx, notify_rx) = watch::channel(()); diff --git a/ephemera/Cargo.toml b/ephemera/Cargo.toml index f0af6af3a9b..1a07747fdab 100644 --- a/ephemera/Cargo.toml +++ b/ephemera/Cargo.toml @@ -45,9 +45,9 @@ serde = { version = "1.0", features = ["derive"] } serde_derive = "1.0.149" serde_json = "1.0.91" thiserror = { workspace = true } -tokio = { version = "1", features = ["macros", "net","rt-multi-thread"] } +tokio = { workspace = true, features = ["macros", "net","rt-multi-thread"] } tokio-tungstenite = { workspace = true } -tokio-util = { version = "0.7.4", features = ["full"] } +tokio-util = { workspace = true, features = ["full"] } toml = "0.7.0" unsigned-varint = "0.7.1" utoipa = { workspace = true, features = ["actix_extras"] } diff --git a/gateway/Cargo.toml b/gateway/Cargo.toml index 285864be20d..24806f8be83 100644 --- a/gateway/Cargo.toml +++ b/gateway/Cargo.toml @@ -31,8 +31,7 @@ humantime-serde = "1.0.1" ipnetwork = "0.16" lazy_static = "1.4.0" log = { workspace = true } -once_cell = "1.7.2" -pretty_env_logger = "0.4" +once_cell = { workspace = true } rand = "0.7" serde = { workspace = true, features = ["derive"] } serde_json = { workspace = true } @@ -51,9 +50,9 @@ tokio = { workspace = true, features = [ "fs", "time", ] } -tokio-stream = { version = "0.1.11", features = ["fs"] } -tokio-tungstenite = { version = "0.20.1" } -tokio-util = { version = "0.7.4", features = ["codec"] } +tokio-stream = { workspace = true, features = ["fs"] } +tokio-tungstenite = { workspace = true } +tokio-util = { workspace = true, features = ["codec"] } url = { workspace = true, features = ["serde"] } zeroize = { workspace = true } diff --git a/gateway/src/commands/helpers.rs b/gateway/src/commands/helpers.rs index 3b97212ae63..13d7b97765c 100644 --- a/gateway/src/commands/helpers.rs +++ b/gateway/src/commands/helpers.rs @@ -38,6 +38,7 @@ pub(crate) struct OverrideConfig { pub(crate) statistics_service_url: Option, pub(crate) nym_apis: Option>, pub(crate) mnemonic: Option, + pub(crate) enforce_forward_travel: Option, pub(crate) nyxd_urls: Option>, pub(crate) only_coconut_credentials: Option, pub(crate) with_network_requester: Option, @@ -52,6 +53,10 @@ impl OverrideConfig { .with_optional(Config::with_listening_address, self.listening_address) .with_optional(Config::with_mix_port, self.mix_port) .with_optional(Config::with_clients_port, self.clients_port) + .with_optional( + Config::with_enforce_forward_travel, + self.enforce_forward_travel, + ) .with_optional_custom_env( Config::with_custom_nym_apis, self.nym_apis, diff --git a/gateway/src/commands/init.rs b/gateway/src/commands/init.rs index bfa12bd66d8..f299993bc54 100644 --- a/gateway/src/commands/init.rs +++ b/gateway/src/commands/init.rs @@ -20,41 +20,41 @@ use super::helpers::OverrideIpPacketRouterConfig; #[derive(Args, Clone)] pub struct Init { /// Id of the gateway we want to create config for - #[clap(long)] + #[arg(long)] id: String, /// The listening address on which the gateway will be receiving sphinx packets and listening for client data - #[clap(long, alias = "host")] + #[arg(long, alias = "host")] listening_address: IpAddr, /// Comma separated list of public ip addresses that will announced to the nym-api and subsequently to the clients. /// In nearly all circumstances, it's going to be identical to the address you're going to use for bonding. - #[clap(long, value_delimiter = ',')] + #[arg(long, value_delimiter = ',')] public_ips: Option>, /// Optional hostname associated with this gateway that will announced to the nym-api and subsequently to the clients - #[clap(long)] + #[arg(long)] hostname: Option, /// The port on which the gateway will be listening for sphinx packets - #[clap(long)] + #[arg(long)] mix_port: Option, /// The port on which the gateway will be listening for clients gateway-requests - #[clap(long)] + #[arg(long)] clients_port: Option, /// Path to sqlite database containing all gateway persistent data - #[clap(long)] + #[arg(long)] datastore: Option, /// Comma separated list of endpoints of nym APIs - #[clap(long, alias = "validator_apis", value_delimiter = ',')] + #[arg(long, alias = "validator_apis", value_delimiter = ',')] // the alias here is included for backwards compatibility (1.1.4 and before) nym_apis: Option>, /// Comma separated list of endpoints of the validator - #[clap( + #[arg( long, alias = "validators", alias = "nyxd_validators", @@ -65,47 +65,52 @@ pub struct Init { nyxd_urls: Option>, /// Cosmos wallet mnemonic needed for double spending protection - #[clap(long)] + #[arg(long)] mnemonic: Option, /// Set this gateway to work only with coconut credentials; that would disallow clients to /// bypass bandwidth credential requirement - #[clap(long, hide = true)] + #[arg(long, hide = true)] only_coconut_credentials: Option, /// Enable/disable gateway anonymized statistics that get sent to a statistics aggregator server - #[clap(long)] + #[arg(long)] enabled_statistics: Option, /// URL where a statistics aggregator is running. The default value is a Nym aggregator server - #[clap(long)] + #[arg(long)] statistics_service_url: Option, + /// Specifies whether this node should accepts and send out packets that would only go to nodes + /// on the next mix layer + #[arg(long)] + enforce_forward_travel: bool, + /// Allows this gateway to run an embedded network requester for minimal network overhead - #[clap(long, conflicts_with = "with_ip_packet_router")] + #[arg(long, conflicts_with = "with_ip_packet_router")] with_network_requester: bool, /// Allows this gateway to run an embedded network requester for minimal network overhead - #[clap(long, hide = true, conflicts_with = "with_network_requester")] + #[arg(long, hide = true, conflicts_with = "with_network_requester")] with_ip_packet_router: bool, // ##### NETWORK REQUESTER FLAGS ##### /// Specifies whether this network requester should run in 'open-proxy' mode - #[clap(long, requires = "with_network_requester")] + #[arg(long, requires = "with_network_requester")] open_proxy: Option, /// Enable service anonymized statistics that get sent to a statistics aggregator server - #[clap(long, requires = "with_network_requester")] + #[arg(long, requires = "with_network_requester")] enable_statistics: Option, /// Mixnet client address where a statistics aggregator is running. The default value is a Nym /// aggregator client - #[clap(long, requires = "with_network_requester")] + #[arg(long, requires = "with_network_requester")] statistics_recipient: Option, /// Mostly debug-related option to increase default traffic rate so that you would not need to /// modify config post init - #[clap( + #[arg( long, hide = true, conflicts_with = "medium_toggle", @@ -114,7 +119,7 @@ pub struct Init { fastmode: bool, /// Disable loop cover traffic and the Poisson rate limiter (for debugging only) - #[clap( + #[arg( long, hide = true, conflicts_with = "medium_toggle", @@ -124,7 +129,7 @@ pub struct Init { /// Enable medium mixnet traffic, for experiments only. /// This includes things like disabling cover traffic, no per hop delays, etc. - #[clap( + #[arg( long, hide = true, conflicts_with = "no_cover", @@ -136,10 +141,10 @@ pub struct Init { /// Specifies whether this network requester will run using the default ExitPolicy /// as opposed to the allow list. /// Note: this setting will become the default in the future releases. - #[clap(long)] + #[arg(long)] with_exit_policy: Option, - #[clap(short, long, default_value_t = OutputFormat::default())] + #[arg(short, long, default_value_t = OutputFormat::default())] output: OutputFormat, } @@ -154,6 +159,7 @@ impl From for OverrideConfig { datastore: init_config.datastore, nym_apis: init_config.nym_apis, mnemonic: init_config.mnemonic, + enforce_forward_travel: Some(init_config.enforce_forward_travel), enabled_statistics: init_config.enabled_statistics, statistics_service_url: init_config.statistics_service_url, @@ -302,6 +308,7 @@ mod tests { no_cover: false, medium_toggle: false, with_exit_policy: None, + enforce_forward_travel: false, }; std::env::set_var(BECH32_PREFIX, "n"); diff --git a/gateway/src/commands/run.rs b/gateway/src/commands/run.rs index 76f7629de5d..2621ea549fa 100644 --- a/gateway/src/commands/run.rs +++ b/gateway/src/commands/run.rs @@ -108,6 +108,11 @@ pub struct Run { #[arg(long)] statistics_recipient: Option, + /// Specifies whether this node should accepts and send out packets that would only go to nodes + /// on the next mix layer + #[arg(long)] + enforce_forward_travel: Option, + /// Mostly debug-related option to increase default traffic rate so that you would not need to /// modify config post init #[arg(long, hide = true, conflicts_with = "medium_toggle")] @@ -157,6 +162,7 @@ impl From for OverrideConfig { datastore: run_config.datastore, nym_apis: run_config.nym_apis, mnemonic: run_config.mnemonic, + enforce_forward_travel: run_config.enforce_forward_travel, enabled_statistics: run_config.enabled_statistics, statistics_service_url: run_config.statistics_service_url, diff --git a/gateway/src/config/mod.rs b/gateway/src/config/mod.rs index b872a1dffd0..18900d53782 100644 --- a/gateway/src/config/mod.rs +++ b/gateway/src/config/mod.rs @@ -188,11 +188,13 @@ impl Config { self } + #[must_use] pub fn with_enabled_network_requester(mut self, enabled_network_requester: bool) -> Self { self.network_requester.enabled = enabled_network_requester; self } + #[must_use] pub fn with_default_network_requester_config_path(mut self) -> Self { self.storage_paths = self .storage_paths @@ -212,36 +214,43 @@ impl Config { self } + #[must_use] pub fn with_only_coconut_credentials(mut self, only_coconut_credentials: bool) -> Self { self.gateway.only_coconut_credentials = only_coconut_credentials; self } + #[must_use] pub fn with_enabled_statistics(mut self, enabled_statistics: bool) -> Self { self.gateway.enabled_statistics = enabled_statistics; self } + #[must_use] pub fn with_custom_statistics_service_url(mut self, statistics_service_url: Url) -> Self { self.gateway.statistics_service_url = statistics_service_url; self } + #[must_use] pub fn with_custom_nym_apis(mut self, nym_api_urls: Vec) -> Self { self.gateway.nym_api_urls = nym_api_urls; self } + #[must_use] pub fn with_custom_validator_nyxd(mut self, validator_nyxd_urls: Vec) -> Self { self.gateway.nyxd_urls = validator_nyxd_urls; self } + #[must_use] pub fn with_cosmos_mnemonic(mut self, cosmos_mnemonic: bip39::Mnemonic) -> Self { self.gateway.cosmos_mnemonic = cosmos_mnemonic; self } + #[must_use] pub fn with_listening_address(mut self, listening_address: IpAddr) -> Self { self.gateway.listening_address = listening_address; @@ -253,16 +262,25 @@ impl Config { self } + #[must_use] pub fn with_mix_port(mut self, port: u16) -> Self { self.gateway.mix_port = port; self } + #[must_use] pub fn with_clients_port(mut self, port: u16) -> Self { self.gateway.clients_port = port; self } + #[must_use] + pub fn with_enforce_forward_travel(mut self, forward_travel: bool) -> Self { + self.debug.enforce_forward_travel = forward_travel; + self + } + + #[must_use] pub fn with_custom_persistent_store(mut self, store_dir: PathBuf) -> Self { self.storage_paths.clients_storage = store_dir; self @@ -328,7 +346,8 @@ pub struct Gateway { #[zeroize(skip)] pub nym_api_urls: Vec, - /// Addresses to validators which the node uses to check for double spending of ERC20 tokens. + /// Addresses to nyxd validators via which the node can communicate with the chain directly, + /// including for checking for double spending of coconut credentials. #[serde(alias = "validator_nymd_urls")] #[zeroize(skip)] pub nyxd_urls: Vec, @@ -421,6 +440,10 @@ pub struct Debug { /// Number of messages from offline client that can be pulled at once from the storage. pub message_retrieval_limit: i64, + /// Specifies whether this node should accepts and send out packets that would only go to nodes + /// on the next mix layer. + pub enforce_forward_travel: bool, + /// Specifies whether the mixnode should be using the legacy framing for the sphinx packets. // it's set to true by default. The reason for that decision is to preserve compatibility with the // existing nodes whilst everyone else is upgrading and getting the code for handling the new field. @@ -438,6 +461,9 @@ impl Default for Debug { maximum_connection_buffer_size: DEFAULT_MAXIMUM_CONNECTION_BUFFER_SIZE, stored_messages_filename_length: DEFAULT_STORED_MESSAGE_FILENAME_LENGTH, message_retrieval_limit: DEFAULT_MESSAGE_RETRIEVAL_LIMIT, + + // let's keep it disabled for now to not surprise operators/users + enforce_forward_travel: false, use_legacy_framed_packet_version: false, } } diff --git a/gateway/src/config/old_config_v1_1_31.rs b/gateway/src/config/old_config_v1_1_31.rs index 18841a49936..a1891744c2d 100644 --- a/gateway/src/config/old_config_v1_1_31.rs +++ b/gateway/src/config/old_config_v1_1_31.rs @@ -163,6 +163,7 @@ impl From for Config { presence_sending_delay: value.debug.presence_sending_delay, stored_messages_filename_length: value.debug.stored_messages_filename_length, message_retrieval_limit: value.debug.message_retrieval_limit, + enforce_forward_travel: false, use_legacy_framed_packet_version: value.debug.use_legacy_framed_packet_version, }, } diff --git a/gateway/src/config/template.rs b/gateway/src/config/template.rs index 1b6ac712624..58ffd6265ec 100644 --- a/gateway/src/config/template.rs +++ b/gateway/src/config/template.rs @@ -116,4 +116,10 @@ ip_packet_router_config = '{{ storage_paths.ip_packet_router_config }}' # TODO +[debug] + +# Specifies whether this node should accepts and send out packets that would only go to nodes +# on the next mix layer. +enforce_forward_travel = {{ debug.enforce_forward_travel }} + "#; diff --git a/gateway/src/error.rs b/gateway/src/error.rs index 61f75981efe..8cc8f765bb8 100644 --- a/gateway/src/error.rs +++ b/gateway/src/error.rs @@ -3,6 +3,7 @@ use crate::node::storage::error::StorageError; use nym_ip_packet_router::error::IpPacketRouterError; +use nym_mixnode_common::forward_travel::error::ForwardTravelError; use nym_network_requester::error::{ClientCoreError, NetworkRequesterError}; use nym_validator_client::nyxd::error::NyxdError; use nym_validator_client::nyxd::AccountId; @@ -131,6 +132,12 @@ pub(crate) enum GatewayError { source: NyxdError, }, + #[error("failure in enforcing forward travel of mix packets: {source}")] + ForwardTravel { + #[from] + source: ForwardTravelError, + }, + // TODO: in the future this should work the other way, i.e. NymNode depending on Gateway errors #[error(transparent)] NymNodeError(#[from] nym_node::error::NymNodeError), diff --git a/gateway/src/node/client_handling/websocket/connection_handler/authenticated.rs b/gateway/src/node/client_handling/websocket/connection_handler/authenticated.rs index 103c1ad102f..f04ffb8eb2c 100644 --- a/gateway/src/node/client_handling/websocket/connection_handler/authenticated.rs +++ b/gateway/src/node/client_handling/websocket/connection_handler/authenticated.rs @@ -15,12 +15,12 @@ use nym_sphinx::forwarding::packet::MixPacket; use nym_task::TaskClient; use nym_validator_client::coconut::CoconutApiError; use rand::{CryptoRng, Rng}; +use std::net::SocketAddr; +use std::{convert::TryFrom, process, time::Duration}; use thiserror::Error; use tokio::io::{AsyncRead, AsyncWrite}; use tokio_tungstenite::tungstenite::{protocol::Message, Error as WsError}; -use std::{convert::TryFrom, process, time::Duration}; - use crate::node::{ client_handling::{ bandwidth::Bandwidth, @@ -40,13 +40,13 @@ pub(crate) enum RequestHandlingError { #[error("Internal gateway storage error")] StorageError(#[from] StorageError), - #[error("Provided bandwidth IV is malformed - {0}")] + #[error("Provided bandwidth IV is malformed: {0}")] MalformedIV(#[from] IVConversionError), - #[error("Provided binary request was malformed - {0}")] + #[error("Provided binary request was malformed: {0}")] InvalidBinaryRequest(#[from] GatewayRequestsError), - #[error("Provided binary request was malformed - {0}")] + #[error("Provided binary request was malformed: {0}")] InvalidTextRequest(>::Error), #[error("The received request is not valid in the current context")] @@ -61,10 +61,10 @@ pub(crate) enum RequestHandlingError { #[error("This gateway is only accepting coconut credentials for bandwidth")] OnlyCoconutCredentials, - #[error("Nyxd Error - {0}")] + #[error("Nyxd Error: {0}")] NyxdError(#[from] nym_validator_client::nyxd::error::NyxdError), - #[error("Validator API error - {0}")] + #[error("Validator API error: {0}")] APIError(#[from] nym_validator_client::ValidatorClientError), #[error("Not enough nym API endpoints provided. Needed {needed}, received {received}")] @@ -73,14 +73,17 @@ pub(crate) enum RequestHandlingError { #[error("There was a problem with the proposal id: {reason}")] ProposalIdError { reason: String }, - #[error("Coconut interface error - {0}")] + #[error("Coconut interface error: {0}")] CoconutInterfaceError(#[from] nym_coconut_interface::error::CoconutInterfaceError), #[error("coconut api query failure: {0}")] CoconutApiError(#[from] CoconutApiError), - #[error("Credential error - {0}")] + #[error("Credential error: {0}")] CredentialError(#[from] nym_credentials::error::Error), + + #[error("the outbound address of the received packet ('{address}') is forbidden as there are no nodes with that address on the next layer")] + InvalidForwardHop { address: SocketAddr }, } impl RequestHandlingError { @@ -203,11 +206,25 @@ where /// # Arguments /// /// * `mix_packet`: packet received from the client that should get forwarded into the network. - fn forward_packet(&self, mix_packet: MixPacket) { + fn forward_packet(&self, mix_packet: MixPacket) -> Result<(), RequestHandlingError> { + let next_hop: SocketAddr = mix_packet.next_hop().into(); + + // TODO: another option is to move this filter + // (which is used by EVERY `ConnectionHandler`, so potentially hundreds of times) + // to packet forwarder where we could be filtering at the time of attempting to open new outbound connections + // However, in that case we wouldn't be able to return an error message to the client + if !self.inner.allowed_egress.is_allowed(next_hop.ip()) { + // TODO: perhaps this should get lowered in severity? + warn!("received an packet that was meant to get forwarded to {next_hop}, but this address does not belong to any node on the next layer - dropping the packet"); + return Err(RequestHandlingError::InvalidForwardHop { address: next_hop }); + } + if let Err(err) = self.inner.outbound_mix_sender.unbounded_send(mix_packet) { error!("We failed to forward requested mix packet - {err}. Presumably our mix forwarder has crashed. We cannot continue."); process::exit(1); } + + Ok(()) } /// Tries to handle the received bandwidth request by checking correctness of the received data @@ -317,7 +334,7 @@ where } self.consume_bandwidth(consumed_bandwidth).await?; - self.forward_packet(mix_packet); + self.forward_packet(mix_packet)?; Ok(ServerResponse::Send { remaining_bandwidth: available_bandwidth - consumed_bandwidth, diff --git a/gateway/src/node/client_handling/websocket/connection_handler/fresh.rs b/gateway/src/node/client_handling/websocket/connection_handler/fresh.rs index 2d08c85709e..57ddd056115 100644 --- a/gateway/src/node/client_handling/websocket/connection_handler/fresh.rs +++ b/gateway/src/node/client_handling/websocket/connection_handler/fresh.rs @@ -18,6 +18,7 @@ use nym_gateway_requests::{ BinaryResponse, PROTOCOL_VERSION, }; use nym_mixnet_client::forwarder::MixForwardingSender; +use nym_mixnode_common::forward_travel::AllowedEgress; use nym_sphinx::DestinationAddressBytes; use rand::{CryptoRng, Rng}; use std::{convert::TryFrom, sync::Arc, time::Duration}; @@ -89,6 +90,7 @@ impl InitialAuthenticationError { pub(crate) struct FreshHandler { rng: R, local_identity: Arc, + pub(crate) allowed_egress: AllowedEgress, pub(crate) only_coconut_credentials: bool, pub(crate) active_clients_store: ActiveClientsStore, pub(crate) outbound_mix_sender: MixForwardingSender, @@ -110,6 +112,7 @@ where pub(crate) fn new( rng: R, conn: S, + allowed_egress: AllowedEgress, only_coconut_credentials: bool, outbound_mix_sender: MixForwardingSender, local_identity: Arc, @@ -126,6 +129,7 @@ where local_identity, storage, coconut_verifier, + allowed_egress, } } diff --git a/gateway/src/node/client_handling/websocket/listener.rs b/gateway/src/node/client_handling/websocket/listener.rs index 27a03aacfc2..e4d66af7bd0 100644 --- a/gateway/src/node/client_handling/websocket/listener.rs +++ b/gateway/src/node/client_handling/websocket/listener.rs @@ -8,6 +8,7 @@ use crate::node::storage::Storage; use log::*; use nym_crypto::asymmetric::identity; use nym_mixnet_client::forwarder::MixForwardingSender; +use nym_mixnode_common::forward_travel::AllowedEgress; use rand::rngs::OsRng; use std::net::SocketAddr; use std::process; @@ -16,6 +17,7 @@ use tokio::task::JoinHandle; pub(crate) struct Listener { address: SocketAddr, + allowed_egress: AllowedEgress, local_identity: Arc, only_coconut_credentials: bool, pub(crate) coconut_verifier: Arc, @@ -24,12 +26,14 @@ pub(crate) struct Listener { impl Listener { pub(crate) fn new( address: SocketAddr, + allowed_egress: AllowedEgress, local_identity: Arc, only_coconut_credentials: bool, coconut_verifier: Arc, ) -> Self { Listener { address, + allowed_egress, local_identity, only_coconut_credentials, coconut_verifier, @@ -71,6 +75,7 @@ impl Listener { let handle = FreshHandler::new( OsRng, socket, + self.allowed_egress.clone(), self.only_coconut_credentials, outbound_mix_sender.clone(), Arc::clone(&self.local_identity), diff --git a/gateway/src/node/mixnet_handling/receiver/listener.rs b/gateway/src/node/mixnet_handling/receiver/listener.rs index 9ca22c77af4..eebfbf9fd76 100644 --- a/gateway/src/node/mixnet_handling/receiver/listener.rs +++ b/gateway/src/node/mixnet_handling/receiver/listener.rs @@ -4,6 +4,7 @@ use crate::node::mixnet_handling::receiver::connection_handler::ConnectionHandler; use crate::node::storage::Storage; use log::*; +use nym_mixnode_common::forward_travel::AllowedIngress; use nym_task::TaskClient; use std::net::SocketAddr; use std::process; @@ -11,13 +12,22 @@ use tokio::task::JoinHandle; pub(crate) struct Listener { address: SocketAddr, + allowed_ingress: AllowedIngress, shutdown: TaskClient, } // TODO: this file is nearly identical to the one in mixnode impl Listener { - pub(crate) fn new(address: SocketAddr, shutdown: TaskClient) -> Self { - Listener { address, shutdown } + pub(crate) fn new( + address: SocketAddr, + allowed_ingress: AllowedIngress, + shutdown: TaskClient, + ) -> Self { + Listener { + address, + allowed_ingress, + shutdown, + } } pub(crate) async fn run(&mut self, connection_handler: ConnectionHandler) @@ -42,6 +52,12 @@ impl Listener { connection = tcp_listener.accept() => { match connection { Ok((socket, remote_addr)) => { + if !self.allowed_ingress.is_allowed(remote_addr.ip()) { + // TODO: perhaps this should get lowered in severity? + warn!("received an incoming connection from {remote_addr}, but this address does not belong to any node on the previous layer - dropping the connection"); + continue + } + let handler = connection_handler.clone(); tokio::spawn(handler.handle_connection(socket, remote_addr, self.shutdown.clone().named(format!("MixnetConnectionHandler_{remote_addr}")))); } diff --git a/gateway/src/node/mod.rs b/gateway/src/node/mod.rs index fa0f8b35b0b..f059b055dfc 100644 --- a/gateway/src/node/mod.rs +++ b/gateway/src/node/mod.rs @@ -28,6 +28,7 @@ use futures::channel::{mpsc, oneshot}; use log::*; use nym_crypto::asymmetric::{encryption, identity}; use nym_mixnet_client::forwarder::{MixForwardingSender, PacketForwarder}; +use nym_mixnode_common::forward_travel::{AllowedAddressesProvider, AllowedEgress, AllowedIngress}; use nym_network_defaults::NymNetworkDetails; use nym_network_requester::{LocalGateway, NRServiceProviderBuilder, RequestFilter}; use nym_node::wireguard::types::GatewayClientRegistry; @@ -178,6 +179,7 @@ impl Gateway { &self, ack_sender: MixForwardingSender, active_clients_store: ActiveClientsStore, + ingress: AllowedIngress, shutdown: TaskClient, ) where St: Storage + Clone + 'static, @@ -199,7 +201,8 @@ impl Gateway { self.config.gateway.mix_port, ); - mixnet_handling::Listener::new(listening_address, shutdown).start(connection_handler); + mixnet_handling::Listener::new(listening_address, ingress, shutdown) + .start(connection_handler); } #[cfg(feature = "wireguard")] @@ -212,6 +215,7 @@ impl Gateway { fn start_client_websocket_listener( &self, + allowed_egress: AllowedEgress, forwarding_channel: MixForwardingSender, active_clients_store: ActiveClientsStore, shutdown: TaskClient, @@ -228,6 +232,7 @@ impl Gateway { websocket::Listener::new( listening_address, + allowed_egress, Arc::clone(&self.identity_keypair), self.config.gateway.only_coconut_credentials, coconut_verifier, @@ -240,6 +245,28 @@ impl Gateway { ); } + async fn start_allowed_addresses_provider( + &self, + task_client: TaskClient, + ) -> Result<(AllowedIngress, AllowedEgress), GatewayError> { + let identity = self.identity_keypair.public_key().to_base58_string(); + let nyxd_endpoints = self.config.gateway.nyxd_urls.clone(); + + let network = NymNetworkDetails::new_from_env(); + let mut provider = AllowedAddressesProvider::new( + identity, + nyxd_endpoints, + !self.config.debug.enforce_forward_travel, + Some(network), + ) + .await?; + + let filters = (provider.ingress(), provider.egress()); + + tokio::spawn(async move { provider.run(task_client).await }); + Ok(filters) + } + fn start_packet_forwarder(&self, shutdown: TaskClient) -> MixForwardingSender { info!("Starting mix packet forwarder..."); @@ -449,6 +476,12 @@ impl Gateway { let shutdown = TaskManager::new(10); + let (ingress, egress) = self + .start_allowed_addresses_provider( + shutdown.subscribe().named("AllowedAddressesProvider"), + ) + .await?; + let coconut_verifier = { let nyxd_client = self.random_nyxd_client()?; CoconutVerifier::new(nyxd_client) @@ -461,6 +494,7 @@ impl Gateway { self.start_mix_socket_listener( mix_forwarding_channel.clone(), active_clients_store.clone(), + ingress, shutdown.subscribe().named("mixnet_handling::Listener"), ); @@ -478,6 +512,7 @@ impl Gateway { } self.start_client_websocket_listener( + egress, mix_forwarding_channel.clone(), active_clients_store.clone(), shutdown.subscribe().named("websocket::Listener"), diff --git a/mixnode/Cargo.toml b/mixnode/Cargo.toml index bdcb0b3a9bf..1cfd0b5dbb3 100644 --- a/mixnode/Cargo.toml +++ b/mixnode/Cargo.toml @@ -18,7 +18,7 @@ rust-version = "1.58.1" [dependencies] axum = { workspace = true } -anyhow = "1.0.40" +anyhow = { workspace = true } bs58 = "0.4.0" clap = { workspace = true, features = ["cargo", "derive"] } colored = "2.0" @@ -26,15 +26,14 @@ cupid = "0.6.1" dirs = "4.0" futures = { workspace = true } humantime-serde = "1.0" -lazy_static = "1.4.0" +lazy_static = { workspace = true } log = { workspace = true } -pretty_env_logger = "0.4.0" rand = "0.7.3" serde = { workspace = true, features = ["derive"] } serde_json = { workspace = true } sysinfo = "0.27.7" -tokio = { version = "1.21.2", features = ["rt-multi-thread", "net", "signal"] } -tokio-util = { version = "0.7.3", features = ["codec"] } +tokio = { workspace = true, features = ["rt-multi-thread", "net", "signal"] } +tokio-util = { workspace = true, features = ["codec"] } toml = "0.5.8" url = { workspace = true, features = ["serde"] } cfg-if = "1.0.0" diff --git a/mixnode/src/commands/init.rs b/mixnode/src/commands/init.rs index 3e678d3d4ae..da9a91d611e 100644 --- a/mixnode/src/commands/init.rs +++ b/mixnode/src/commands/init.rs @@ -16,31 +16,40 @@ use std::{fs, io}; #[derive(Args, Clone)] pub(crate) struct Init { /// Id of the mixnode we want to create config for - #[clap(long)] + #[arg(long)] id: String, /// The host on which the mixnode will be running - #[clap(long)] + #[arg(long)] host: IpAddr, /// The port on which the mixnode will be listening for mix packets - #[clap(long)] + #[arg(long)] mix_port: Option, /// The port on which the mixnode will be listening for verloc packets - #[clap(long)] + #[arg(long)] verloc_port: Option, /// The port on which the mixnode will be listening for http requests - #[clap(long)] + #[arg(long)] http_api_port: Option, /// Comma separated list of nym-api endpoints of the validators // the alias here is included for backwards compatibility (1.1.4 and before) - #[clap(long, alias = "validators", value_delimiter = ',')] + #[arg(long, alias = "validators", value_delimiter = ',')] nym_apis: Option>, - #[clap(short, long, default_value_t = OutputFormat::default())] + /// Comma separated list of endpoints of the nyxd validators + #[arg(long, value_delimiter = ',')] + nyxd_urls: Option>, + + /// Specifies whether this node should accepts and send out packets that would only go to nodes + /// on the next mix layer + #[arg(long)] + enforce_forward_travel: bool, + + #[arg(short, long, default_value_t = OutputFormat::default())] output: OutputFormat, } @@ -52,7 +61,9 @@ impl From for OverrideConfig { mix_port: init_config.mix_port, verloc_port: init_config.verloc_port, http_api_port: init_config.http_api_port, + enforce_forward_travel: Some(init_config.enforce_forward_travel), nym_apis: init_config.nym_apis, + nyxd_urls: init_config.nyxd_urls, } } } diff --git a/mixnode/src/commands/mod.rs b/mixnode/src/commands/mod.rs index dfa1fcbd42f..c917faf4320 100644 --- a/mixnode/src/commands/mod.rs +++ b/mixnode/src/commands/mod.rs @@ -10,7 +10,7 @@ use colored::Colorize; use log::{error, info, warn}; use nym_bin_common::completions::{fig_generate, ArgShell}; use nym_bin_common::version_checker; -use nym_config::defaults::var_names::{BECH32_PREFIX, NYM_API}; +use nym_config::defaults::var_names; use nym_config::OptionalSet; use nym_crypto::bech32_address_validation; use std::net::IpAddr; @@ -58,7 +58,9 @@ struct OverrideConfig { mix_port: Option, verloc_port: Option, http_api_port: Option, + enforce_forward_travel: Option, nym_apis: Option>, + nyxd_urls: Option>, } pub(crate) async fn execute(args: Cli) -> anyhow::Result<()> { @@ -83,22 +85,32 @@ fn override_config(config: Config, args: OverrideConfig) -> Config { .with_optional(Config::with_mix_port, args.mix_port) .with_optional(Config::with_verloc_port, args.verloc_port) .with_optional(Config::with_http_api_port, args.http_api_port) + .with_optional( + Config::with_enforce_forward_travel, + args.enforce_forward_travel, + ) .with_optional_custom_env( Config::with_custom_nym_apis, args.nym_apis, - NYM_API, + var_names::NYM_API, + nym_config::parse_urls, + ) + .with_optional_custom_env( + Config::with_custom_nyxd, + args.nyxd_urls, + var_names::NYXD, nym_config::parse_urls, ) } /// Ensures that a given bech32 address is valid, or exits pub(crate) fn validate_bech32_address_or_exit(address: &str) { - let prefix = std::env::var(BECH32_PREFIX).expect("bech32 prefix not set"); + let prefix = std::env::var(var_names::BECH32_PREFIX).expect("bech32 prefix not set"); if let Err(bech32_address_validation::Bech32Error::DecodeFailed(err)) = bech32_address_validation::try_bech32_decode(address) { let error_message = format!("Error: wallet address decoding failed: {err}").red(); - error!("{}", error_message); + error!("{error_message}"); error!("Exiting..."); process::exit(1); } @@ -107,7 +119,7 @@ pub(crate) fn validate_bech32_address_or_exit(address: &str) { bech32_address_validation::validate_bech32_prefix(&prefix, address) { let error_message = format!("Error: wallet address type is wrong, {err}").red(); - error!("{}", error_message); + error!("{error_message}"); error!("Exiting..."); process::exit(1); } diff --git a/mixnode/src/commands/run.rs b/mixnode/src/commands/run.rs index c4ab5125fbb..5d081886e23 100644 --- a/mixnode/src/commands/run.rs +++ b/mixnode/src/commands/run.rs @@ -14,35 +14,44 @@ use std::net::IpAddr; #[derive(Args, Clone)] pub(crate) struct Run { /// Id of the nym-mixnode we want to run - #[clap(long)] + #[arg(long)] id: String, /// The custom host on which the mixnode will be running - #[clap(long)] + #[arg(long)] host: Option, /// The wallet address you will use to bond this mixnode, e.g. nymt1z9egw0knv47nmur0p8vk4rcx59h9gg4zuxrrr9 - #[clap(long)] + #[arg(long)] wallet_address: Option, /// The port on which the mixnode will be listening for mix packets - #[clap(long)] + #[arg(long)] mix_port: Option, /// The port on which the mixnode will be listening for verloc packets - #[clap(long)] + #[arg(long)] verloc_port: Option, /// The port on which the mixnode will be listening for http requests - #[clap(long)] + #[arg(long)] http_api_port: Option, /// Comma separated list of nym-api endpoints of the validators // the alias here is included for backwards compatibility (1.1.4 and before) - #[clap(long, alias = "validators", value_delimiter = ',')] + #[arg(long, alias = "validators", value_delimiter = ',')] nym_apis: Option>, - #[clap(short, long, default_value_t = OutputFormat::default())] + /// Comma separated list of endpoints of the nyxd validators + #[arg(long, value_delimiter = ',')] + nyxd_urls: Option>, + + /// Specifies whether this node should accepts and send out packets that would only go to nodes + /// on the next mix layer + #[arg(long)] + enforce_forward_travel: Option, + + #[arg(short, long, default_value_t = OutputFormat::default())] output: OutputFormat, } @@ -54,7 +63,9 @@ impl From for OverrideConfig { mix_port: run_config.mix_port, verloc_port: run_config.verloc_port, http_api_port: run_config.http_api_port, + enforce_forward_travel: run_config.enforce_forward_travel, nym_apis: run_config.nym_apis, + nyxd_urls: run_config.nyxd_urls, } } } diff --git a/mixnode/src/config/mod.rs b/mixnode/src/config/mod.rs index a1ab25a752c..42ffff21fea 100644 --- a/mixnode/src/config/mod.rs +++ b/mixnode/src/config/mod.rs @@ -175,11 +175,19 @@ impl Config { } // builder methods + #[must_use] pub fn with_custom_nym_apis(mut self, nym_api_urls: Vec) -> Self { self.mixnode.nym_api_urls = nym_api_urls; self } + #[must_use] + pub fn with_custom_nyxd(mut self, nyxd_urls: Vec) -> Self { + self.mixnode.nyxd_urls = nyxd_urls; + self + } + + #[must_use] pub fn with_listening_address(mut self, listening_address: IpAddr) -> Self { self.mixnode.listening_address = listening_address; @@ -189,22 +197,31 @@ impl Config { self } + #[must_use] pub fn with_mix_port(mut self, port: u16) -> Self { self.mixnode.mix_port = port; self } + #[must_use] pub fn with_verloc_port(mut self, port: u16) -> Self { self.mixnode.verloc_port = port; self } + #[must_use] pub fn with_http_api_port(mut self, port: u16) -> Self { let http_ip = self.http.bind_address.ip(); self.http.bind_address = SocketAddr::new(http_ip, port); self } + #[must_use] + pub fn with_enforce_forward_travel(mut self, forward_travel: bool) -> Self { + self.debug.enforce_forward_travel = forward_travel; + self + } + pub fn get_nym_api_endpoints(&self) -> Vec { self.mixnode.nym_api_urls.clone() } @@ -231,6 +248,9 @@ pub struct MixNode { /// Addresses to nym APIs from which the node gets the view of the network. pub nym_api_urls: Vec, + + /// Addresses to nyxd validators via which the node can communicate with the chain directly. + pub nyxd_urls: Vec, } impl MixNode { @@ -242,6 +262,7 @@ impl MixNode { mix_port: DEFAULT_MIX_LISTENING_PORT, verloc_port: DEFAULT_VERLOC_LISTENING_PORT, nym_api_urls: vec![Url::from_str(mainnet::NYM_API).expect("Invalid default API URL")], + nyxd_urls: vec![Url::from_str(mainnet::NYXD_URL).expect("Invalid default nyxd URL")], } } } @@ -319,6 +340,10 @@ pub struct Debug { /// Maximum number of packets that can be stored waiting to get sent to a particular connection. pub maximum_connection_buffer_size: usize, + /// Specifies whether this node should accepts and send out packets that would only go to nodes + /// on the next mix layer. + pub enforce_forward_travel: bool, + /// Specifies whether the mixnode should be using the legacy framing for the sphinx packets. // it's set to true by default. The reason for that decision is to preserve compatibility with the // existing nodes whilst everyone else is upgrading and getting the code for handling the new field. @@ -335,6 +360,9 @@ impl Default for Debug { packet_forwarding_maximum_backoff: DEFAULT_PACKET_FORWARDING_MAXIMUM_BACKOFF, initial_connection_timeout: DEFAULT_INITIAL_CONNECTION_TIMEOUT, maximum_connection_buffer_size: DEFAULT_MAXIMUM_CONNECTION_BUFFER_SIZE, + + // let's keep it disabled for now to not surprise operators/users + enforce_forward_travel: false, use_legacy_framed_packet_version: false, } } diff --git a/mixnode/src/config/old_config_v1_1_32.rs b/mixnode/src/config/old_config_v1_1_32.rs index a17cdeadcdc..9043269033f 100644 --- a/mixnode/src/config/old_config_v1_1_32.rs +++ b/mixnode/src/config/old_config_v1_1_32.rs @@ -75,6 +75,8 @@ impl ConfigV1_1_32 { impl From for Config { fn from(value: ConfigV1_1_32) -> Self { + let network = nym_config::defaults::NymNetworkDetails::new_from_env(); + Config { // \/ ADDED save_path: None, @@ -104,6 +106,18 @@ impl From for Config { mix_port: value.mixnode.mix_port, verloc_port: value.mixnode.verloc_port, nym_api_urls: value.mixnode.nym_api_urls, + + // \/ ADDED + nyxd_urls: network + .endpoints + .into_iter() + .map(|e| { + e.nyxd_url + .parse() + .expect("malformed nyxd url in environment") + }) + .collect(), + // /\ ADDED }, storage_paths: value.storage_paths, verloc: value.verloc.into(), @@ -243,6 +257,7 @@ impl From for Debug { packet_forwarding_maximum_backoff: value.packet_forwarding_maximum_backoff, initial_connection_timeout: value.initial_connection_timeout, maximum_connection_buffer_size: value.maximum_connection_buffer_size, + enforce_forward_travel: false, use_legacy_framed_packet_version: value.use_legacy_framed_packet_version, } } diff --git a/mixnode/src/config/template.rs b/mixnode/src/config/template.rs index 79a4dd72a8a..e1d8515c034 100644 --- a/mixnode/src/config/template.rs +++ b/mixnode/src/config/template.rs @@ -49,6 +49,13 @@ nym_api_urls = [ {{/each}} ] +# Addresses to nyxd validators via which the node can communicate with the chain directly. +nyxd_urls = [ + {{#each mixnode.nyxd_urls }} + '{{this}}', + {{/each}} +] + [http] # Socket address this node will use for binding its http API. # default: `0.0.0.0:8000` @@ -80,4 +87,10 @@ node_description = '{{ storage_paths.node_description }}' # TODO +[debug] + +# Specifies whether this node should accepts and send out packets that would only go to nodes +# on the next mix layer. +enforce_forward_travel = {{ debug.enforce_forward_travel }} + "#; diff --git a/mixnode/src/error.rs b/mixnode/src/error.rs index 57c45947478..da6601257f5 100644 --- a/mixnode/src/error.rs +++ b/mixnode/src/error.rs @@ -1,6 +1,7 @@ // Copyright 2023 - Nym Technologies SA // SPDX-License-Identifier: GPL-3.0-only +use nym_mixnode_common::forward_travel::error::ForwardTravelError; use std::io; use std::path::PathBuf; use thiserror::Error; @@ -44,7 +45,24 @@ pub enum MixnodeError { source: io::Error, }, + #[error("experienced an error during shutdown: {message}")] + ShutdownFailure { message: String }, + + #[error("failure in enforcing forward travel of mix packets: {source}")] + ForwardTravel { + #[from] + source: ForwardTravelError, + }, + // TODO: in the future this should work the other way, i.e. NymNode depending on Gateway errors #[error(transparent)] NymNodeError(#[from] nym_node::error::NymNodeError), } + +impl MixnodeError { + pub(crate) fn shutdown_failure(err: Box) -> Self { + MixnodeError::ShutdownFailure { + message: err.to_string(), + } + } +} diff --git a/mixnode/src/node/listener/connection_handler/mod.rs b/mixnode/src/node/listener/connection_handler/mod.rs index c16a700bfd8..90c8e64fa9c 100644 --- a/mixnode/src/node/listener/connection_handler/mod.rs +++ b/mixnode/src/node/listener/connection_handler/mod.rs @@ -7,8 +7,8 @@ use crate::node::listener::connection_handler::packet_processing::{ use crate::node::packet_delayforwarder::PacketDelayForwardSender; use crate::node::TaskClient; use futures::StreamExt; -use log::debug; -use log::{error, info, warn}; +use log::{debug, error, info, warn}; +use nym_mixnode_common::forward_travel::AllowedEgress; use nym_mixnode_common::measure; use nym_sphinx::forwarding::packet::MixPacket; use nym_sphinx::framing::codec::NymCodec; @@ -26,22 +26,37 @@ pub(crate) mod packet_processing; #[derive(Clone)] pub(crate) struct ConnectionHandler { + allowed_egress: AllowedEgress, packet_processor: PacketProcessor, delay_forwarding_channel: PacketDelayForwardSender, } impl ConnectionHandler { pub(crate) fn new( + allowed_egress: AllowedEgress, packet_processor: PacketProcessor, delay_forwarding_channel: PacketDelayForwardSender, ) -> Self { ConnectionHandler { + allowed_egress, packet_processor, delay_forwarding_channel, } } fn delay_and_forward_packet(&self, mix_packet: MixPacket, delay: Option) { + let next_hop: SocketAddr = mix_packet.next_hop().into(); + + // TODO: another option is to move this filter + // (which is used by EVERY `ConnectionHandler`, so potentially hundreds of times) + // to the mixnet client where we could be filtering at the time of attempting to open new outbound connections + // However, in that case we'd have gone through the troubles of possibly unnecessarily delaying the packet + if !self.allowed_egress.is_allowed(next_hop.ip()) { + // TODO: perhaps this should get lowered in severity? + warn!("received an packet that was meant to get forwarded to {next_hop}, but this address does not belong to any node on the next layer - dropping the packet"); + return; + } + // determine instant at which packet should get forwarded. this way we minimise effect of // being stuck in the queue [of the channel] to get inserted into the delay queue let forward_instant = delay.map(|delay| Instant::now() + delay.to_duration()); diff --git a/mixnode/src/node/listener/mod.rs b/mixnode/src/node/listener/mod.rs index 995a6e7c73b..39178814d1c 100644 --- a/mixnode/src/node/listener/mod.rs +++ b/mixnode/src/node/listener/mod.rs @@ -3,6 +3,7 @@ use crate::node::listener::connection_handler::ConnectionHandler; use log::{error, info, warn}; +use nym_mixnode_common::forward_travel::AllowedIngress; use std::net::SocketAddr; use std::process; use tokio::net::TcpListener; @@ -14,12 +15,21 @@ pub(crate) mod connection_handler; pub(crate) struct Listener { address: SocketAddr, + allowed_ingress: AllowedIngress, shutdown: TaskClient, } impl Listener { - pub(crate) fn new(address: SocketAddr, shutdown: TaskClient) -> Self { - Listener { address, shutdown } + pub(crate) fn new( + address: SocketAddr, + allowed_ingress: AllowedIngress, + shutdown: TaskClient, + ) -> Self { + Listener { + address, + allowed_ingress, + shutdown, + } } async fn run(&mut self, connection_handler: ConnectionHandler) { @@ -41,6 +51,12 @@ impl Listener { connection = listener.accept() => { match connection { Ok((socket, remote_addr)) => { + if !self.allowed_ingress.is_allowed(remote_addr.ip()) { + // TODO: perhaps this should get lowered in severity? + warn!("received an incoming connection from {remote_addr}, but this address does not belong to any node on the previous layer - dropping the connection"); + continue + } + let handler = connection_handler.clone(); tokio::spawn(handler.handle_connection(socket, remote_addr, self.shutdown.clone())); } diff --git a/mixnode/src/node/mod.rs b/mixnode/src/node/mod.rs index 5dca2113dcd..ab19c66bc9c 100644 --- a/mixnode/src/node/mod.rs +++ b/mixnode/src/node/mod.rs @@ -15,7 +15,9 @@ use crate::node::packet_delayforwarder::{DelayForwarder, PacketDelayForwardSende use log::{error, info, warn}; use nym_bin_common::output_format::OutputFormat; use nym_bin_common::version_checker::parse_version; +use nym_config::defaults::NymNetworkDetails; use nym_crypto::asymmetric::{encryption, identity}; +use nym_mixnode_common::forward_travel::{AllowedAddressesProvider, AllowedEgress, AllowedIngress}; use nym_mixnode_common::verloc::{self, AtomicVerlocResult, VerlocMeasurer}; use nym_task::{TaskClient, TaskManager}; use rand::seq::SliceRandom; @@ -101,6 +103,8 @@ impl MixNode { &self, node_stats_update_sender: node_statistics::UpdateSender, delay_forwarding_channel: PacketDelayForwardSender, + ingress: AllowedIngress, + egress: AllowedEgress, shutdown: TaskClient, ) { info!("Starting socket listener..."); @@ -108,14 +112,37 @@ impl MixNode { let packet_processor = PacketProcessor::new(self.sphinx_keypair.private_key(), node_stats_update_sender); - let connection_handler = ConnectionHandler::new(packet_processor, delay_forwarding_channel); + let connection_handler = + ConnectionHandler::new(egress, packet_processor, delay_forwarding_channel); let listening_address = SocketAddr::new( self.config.mixnode.listening_address, self.config.mixnode.mix_port, ); - Listener::new(listening_address, shutdown).start(connection_handler); + Listener::new(listening_address, ingress, shutdown).start(connection_handler); + } + + async fn start_allowed_addresses_provider( + &self, + task_client: TaskClient, + ) -> Result<(AllowedIngress, AllowedEgress), MixnodeError> { + let identity = self.identity_keypair.public_key().to_base58_string(); + let nyxd_endpoints = self.config.mixnode.nyxd_urls.clone(); + + let network = NymNetworkDetails::new_from_env(); + let mut provider = AllowedAddressesProvider::new( + identity, + nyxd_endpoints, + !self.config.debug.enforce_forward_travel, + Some(network), + ) + .await?; + + let filters = (provider.ingress(), provider.egress()); + + tokio::spawn(async move { provider.run(task_client).await }); + Ok(filters) } fn start_packet_delay_forwarder( @@ -215,12 +242,16 @@ impl MixNode { }) } - async fn wait_for_interrupt(&self, shutdown: TaskManager) { - let _res = shutdown.catch_interrupt().await; - log::info!("Stopping nym mixnode"); + async fn wait_for_interrupt( + &self, + shutdown: TaskManager, + ) -> Result<(), Box> { + let res = shutdown.catch_interrupt().await; + log::info!("Stopping nym gateway"); + res } - pub async fn run(&mut self) -> Result<(), MixnodeError> { + pub(crate) async fn run(&mut self) -> Result<(), MixnodeError> { info!("Starting nym mixnode"); if self.check_if_bonded().await { @@ -229,6 +260,12 @@ impl MixNode { let shutdown = TaskManager::default(); + let (ingress, egress) = self + .start_allowed_addresses_provider( + shutdown.subscribe().named("AllowedAddressesProvider"), + ) + .await?; + let (node_stats_pointer, node_stats_update_sender) = self .start_node_stats_controller(shutdown.subscribe().named("node_statistics::Controller")); let delay_forwarding_channel = self.start_packet_delay_forwarder( @@ -238,6 +275,8 @@ impl MixNode { self.start_socket_listener( node_stats_update_sender, delay_forwarding_channel, + ingress, + egress, shutdown.subscribe().named("Listener"), ); let atomic_verloc_results = @@ -253,7 +292,9 @@ impl MixNode { )?; info!("Finished nym mixnode startup procedure - it should now be able to receive mix traffic!"); - self.wait_for_interrupt(shutdown).await; - Ok(()) + + self.wait_for_interrupt(shutdown) + .await + .map_err(MixnodeError::shutdown_failure) } } diff --git a/nym-api/Cargo.toml b/nym-api/Cargo.toml index 243e152d52a..1079092563c 100644 --- a/nym-api/Cargo.toml +++ b/nym-api/Cargo.toml @@ -38,13 +38,13 @@ serde_json = { workspace = true } tap = "1.0" thiserror = { workspace = true } time = { workspace = true, features = ["serde-human-readable", "parsing"] } -tokio = { version = "1.24.1", features = [ +tokio = { workspace = true, features = [ "rt-multi-thread", "macros", "signal", "time", ] } -tokio-stream = "0.1.11" +tokio-stream = { workspace = true } url = { workspace = true } ts-rs = { workspace = true, optional = true} diff --git a/nym-connect/desktop/Cargo.lock b/nym-connect/desktop/Cargo.lock index 90510b9aa9f..bfc7e6f5d0f 100644 --- a/nym-connect/desktop/Cargo.lock +++ b/nym-connect/desktop/Cargo.lock @@ -7448,9 +7448,9 @@ checksum = "1f3ccbac311fea05f86f61904b462b55fb3df8837a366dfc601a0161d0532f20" [[package]] name = "tokio" -version = "1.31.0" +version = "1.33.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "40de3a2ba249dcb097e01be5e67a5ff53cf250397715a071a81543e8a832a920" +checksum = "4f38200e3ef7995e5ef13baec2f432a6da0aa9ac495b2c0e8f3b7eec2c92d653" dependencies = [ "backtrace", "bytes", @@ -7557,9 +7557,9 @@ dependencies = [ [[package]] name = "tokio-util" -version = "0.7.8" +version = "0.7.9" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "806fe8c2c87eccc8b3267cbae29ed3ab2d0bd37fca70ab622e46aaa9375ddb7d" +checksum = "1d68074620f57a0b21594d9735eb2e98ab38b17f80d3fcb189fca266771ca60d" dependencies = [ "bytes", "futures-core", diff --git a/nym-wallet/Cargo.lock b/nym-wallet/Cargo.lock index c7dea1c888c..cfaa513c95c 100644 --- a/nym-wallet/Cargo.lock +++ b/nym-wallet/Cargo.lock @@ -5797,9 +5797,9 @@ checksum = "1f3ccbac311fea05f86f61904b462b55fb3df8837a366dfc601a0161d0532f20" [[package]] name = "tokio" -version = "1.31.0" +version = "1.33.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "40de3a2ba249dcb097e01be5e67a5ff53cf250397715a071a81543e8a832a920" +checksum = "4f38200e3ef7995e5ef13baec2f432a6da0aa9ac495b2c0e8f3b7eec2c92d653" dependencies = [ "backtrace", "bytes", diff --git a/scripts/localnet_start.sh b/scripts/localnet_start.sh index ac2a059547d..aafb24a8e04 100755 --- a/scripts/localnet_start.sh +++ b/scripts/localnet_start.sh @@ -8,6 +8,7 @@ localnetdir="$HOME/.nym/localnets/localnet.$suffix" mkdir -p "$localnetdir" echo "Using $localnetdir for the localnet" +cargo build --release # initialise mixnet echo "initialising mixnode1..." diff --git a/sdk/rust/nym-sdk/Cargo.toml b/sdk/rust/nym-sdk/Cargo.toml index fb5cbf4ac8b..88df22adc32 100644 --- a/sdk/rust/nym-sdk/Cargo.toml +++ b/sdk/rust/nym-sdk/Cargo.toml @@ -48,8 +48,8 @@ nym-bin-common = { path = "../../../common/bin-common" } # extra dependencies for libp2p examples libp2p = { git = "https://github.com/ChainSafe/rust-libp2p.git", rev = "e3440d25681df380c9f0f8cfdcfd5ecc0a4f2fb6", features = [ "identify", "macros", "ping", "tokio", "tcp", "dns", "websocket", "noise", "mplex", "yamux", "gossipsub" ]} -tokio-stream = "0.1.12" -tokio-util = { version = "0.7", features = ["codec"] } +tokio-stream = { workspace = true } +tokio-util = { workspace = true, features = ["codec"] } parking_lot = "0.12" hex = "0.4"