diff --git a/Cargo.lock b/Cargo.lock index c3df2e1c05f..13805813c5d 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -5215,6 +5215,7 @@ dependencies = [ "nym-sphinx", "nym-task", "nym-topology", + "nym-topology-control", "nym-validator-client", "rand 0.7.3", "reqwest", @@ -5538,10 +5539,13 @@ dependencies = [ "nym-network-defaults", "nym-network-requester", "nym-node", + "nym-noise", "nym-pemstore", "nym-sphinx", "nym-statistics-common", "nym-task", + "nym-topology", + "nym-topology-control", "nym-types", "nym-validator-client", "nym-wireguard", @@ -5723,10 +5727,16 @@ version = "0.1.0" dependencies = [ "futures", "log", + "nym-crypto", + "nym-noise", "nym-sphinx", "nym-task", + "nym-topology-control", + "nym-validator-client", + "rand 0.7.3", "tokio", "tokio-util", + "url", ] [[package]] @@ -5773,6 +5783,7 @@ dependencies = [ "nym-mixnet-client", "nym-mixnode-common", "nym-node", + "nym-noise", "nym-nonexhaustive-delayqueue", "nym-pemstore", "nym-sphinx", @@ -5780,6 +5791,7 @@ dependencies = [ "nym-sphinx-types", "nym-task", "nym-topology", + "nym-topology-control", "nym-types", "nym-validator-client", "opentelemetry", @@ -6030,6 +6042,24 @@ dependencies = [ "wasmtimer", ] +[[package]] +name = "nym-noise" +version = "0.1.0" +dependencies = [ + "bytes", + "futures", + "log", + "nym-crypto", + "nym-topology", + "pin-project", + "semver 0.11.0", + "sha2 0.10.8", + "snow", + "thiserror", + "tokio", + "tokio-util", +] + [[package]] name = "nym-nonexhaustive-delayqueue" version = "0.1.0" @@ -6122,6 +6152,7 @@ dependencies = [ "nym-sphinx", "nym-task", "nym-topology", + "nym-topology-control", "nym-validator-client", "parking_lot 0.12.1", "pretty_env_logger", @@ -6187,6 +6218,7 @@ dependencies = [ "nym-socks5-client-core", "nym-sphinx", "nym-topology", + "nym-topology-control", "rand 0.7.3", "serde", "serde_json", @@ -6499,6 +6531,29 @@ dependencies = [ "wasm-utils", ] +[[package]] +name = "nym-topology-control" +version = "0.1.0" +dependencies = [ + "async-trait", + "futures", + "gloo-timers", + "log", + "nym-explorer-client", + "nym-network-defaults", + "nym-sphinx", + "nym-task", + "nym-topology", + "nym-validator-client", + "rand 0.7.3", + "serde", + "tap", + "tokio", + "tokio-stream", + "url", + "wasmtimer", +] + [[package]] name = "nym-tun" version = "0.1.0" @@ -6814,9 +6869,9 @@ checksum = "ff011a302c396a5197692431fc1948019154afc178baf7d8e37367442a4601cf" [[package]] name = "openssl-sys" -version = "0.9.98" +version = "0.9.93" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "c1665caf8ab2dc9aef43d1c0023bd904633a6a05cb30b0ad59bec2ae986e57a7" +checksum = "db4d56a4c0478783083cfafcc42493dd4a981d41669da64b4572a2a089b51b1d" dependencies = [ "cc", "libc", @@ -7564,9 +7619,9 @@ dependencies = [ [[package]] name = "psl" -version = "2.1.14" +version = "2.1.15" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "383703acfc34f7a00724846c14dc5ea4407c59e5aedcbbb18a1c0c1a23fe5013" +checksum = "fa35143bed048dcb22457ef82f8ba3008b842e9158e2cfcc904f5a4e2571cd4c" dependencies = [ "psl-types", ] @@ -8808,9 +8863,9 @@ dependencies = [ [[package]] name = "serde_with" -version = "3.4.0" +version = "3.5.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "64cd236ccc1b7a29e7e2739f27c0b2dd199804abc4290e32f59f3b68d6405c23" +checksum = "f5c9fdb6b00a489875b22efd4b78fe2b363b72265cc5f6eb2e2b9ee270e6140c" dependencies = [ "base64 0.21.4", "chrono", @@ -8825,9 +8880,9 @@ dependencies = [ [[package]] name = "serde_with_macros" -version = "3.4.0" +version = "3.5.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "93634eb5f75a2323b16de4748022ac4297f9e76b6dced2be287a099f41b5e788" +checksum = "dbff351eb4b33600a2e138dfa0b10b65a238ea8ff8fb2387c422c5022a3e8298" dependencies = [ "darling 0.20.3", "proc-macro2", diff --git a/clients/socks5/Cargo.toml b/clients/socks5/Cargo.toml index 2665f6355c4..d1e975232d7 100644 --- a/clients/socks5/Cargo.toml +++ b/clients/socks5/Cargo.toml @@ -34,6 +34,7 @@ nym-sphinx = { path = "../../common/nymsphinx" } nym-ordered-buffer = { path = "../../common/socks5/ordered-buffer" } nym-pemstore = { path = "../../common/pemstore" } nym-topology = { path = "../../common/topology" } +nym-topology-control = { path = "../../common/topology-control" } nym-socks5-client-core = { path = "../../common/socks5-client-core" } nym-id = { path = "../../common/nym-id" } diff --git a/clients/socks5/src/commands/mod.rs b/clients/socks5/src/commands/mod.rs index da3166fd7e3..dc431b78f5e 100644 --- a/clients/socks5/src/commands/mod.rs +++ b/clients/socks5/src/commands/mod.rs @@ -16,11 +16,11 @@ use nym_client_core::client::base_client::storage::gateway_details::{ OnDiskGatewayDetails, PersistedGatewayDetails, }; use nym_client_core::client::key_manager::persistence::OnDiskKeys; -use nym_client_core::client::topology_control::geo_aware_provider::CountryGroup; -use nym_client_core::config::{GatewayEndpointConfig, GroupBy, TopologyStructure}; +use nym_client_core::config::{GatewayEndpointConfig, TopologyStructure}; use nym_client_core::error::ClientCoreError; use nym_config::OptionalSet; use nym_sphinx::params::{PacketSize, PacketType}; +use nym_topology_control::geo_aware_provider::{CountryGroup, GroupBy}; use std::error::Error; use std::net::IpAddr; use std::sync::OnceLock; diff --git a/clients/socks5/src/commands/run.rs b/clients/socks5/src/commands/run.rs index 5638c80d9ba..edb1c6ec875 100644 --- a/clients/socks5/src/commands/run.rs +++ b/clients/socks5/src/commands/run.rs @@ -12,9 +12,9 @@ use log::*; use nym_bin_common::version_checker::is_minor_version_compatible; use nym_client_core::cli_helpers::client_run::CommonClientRunArgs; use nym_client_core::client::base_client::storage::OnDiskPersistent; -use nym_client_core::client::topology_control::geo_aware_provider::CountryGroup; use nym_socks5_client_core::NymClient; use nym_sphinx::addressing::clients::Recipient; +use nym_topology_control::geo_aware_provider::CountryGroup; use std::net::IpAddr; #[derive(Args, Clone)] diff --git a/common/client-core/Cargo.toml b/common/client-core/Cargo.toml index dc6c9bba2b2..a9776ff911f 100644 --- a/common/client-core/Cargo.toml +++ b/common/client-core/Cargo.toml @@ -42,6 +42,7 @@ nym-nonexhaustive-delayqueue = { path = "../nonexhaustive-delayqueue" } nym-sphinx = { path = "../nymsphinx" } nym-pemstore = { path = "../pemstore" } nym-topology = { path = "../topology", features = ["serializable"] } +nym-topology-control = { path = "../topology-control" } nym-validator-client = { path = "../client-libs/validator-client", default-features = false } nym-task = { path = "../task" } nym-credential-storage = { path = "../credential-storage" } diff --git a/common/client-core/src/client/base_client/mod.rs b/common/client-core/src/client/base_client/mod.rs index c2632f73add..62b5aca7ef0 100644 --- a/common/client-core/src/client/base_client/mod.rs +++ b/common/client-core/src/client/base_client/mod.rs @@ -3,7 +3,6 @@ use super::packet_statistics_control::PacketStatisticsReporter; use super::received_buffer::ReceivedBufferMessage; -use super::topology_control::geo_aware_provider::GeoAwareTopologyProvider; use crate::client::base_client::storage::gateway_details::GatewayDetailsStore; use crate::client::base_client::storage::MixnetClientStorage; use crate::client::cover_traffic_stream::LoopCoverTrafficStream; @@ -22,10 +21,6 @@ use crate::client::replies::reply_controller::{ReplyControllerReceiver, ReplyCon use crate::client::replies::reply_storage::{ CombinedReplyStorage, PersistentReplyStorage, ReplyStorageBackend, SentReplyKeys, }; -use crate::client::topology_control::nym_api_provider::NymApiTopologyProvider; -use crate::client::topology_control::{ - TopologyAccessor, TopologyRefresher, TopologyRefresherConfig, -}; use crate::config::{Config, DebugConfig}; use crate::error::ClientCoreError; use crate::init::{ @@ -50,6 +45,9 @@ use nym_task::connections::{ConnectionCommandReceiver, ConnectionCommandSender, use nym_task::{TaskClient, TaskHandle}; use nym_topology::provider_trait::TopologyProvider; use nym_topology::HardcodedTopologyProvider; +use nym_topology_control::geo_aware_provider::GeoAwareTopologyProvider; +use nym_topology_control::nym_api_provider::NymApiTopologyProvider; +use nym_topology_control::{TopologyAccessor, TopologyRefresher, TopologyRefresherConfig}; use nym_validator_client::nyxd::contract_traits::DkgQueryClient; use std::fmt::Debug; use std::os::raw::c_int as RawFd; diff --git a/common/client-core/src/client/cover_traffic_stream.rs b/common/client-core/src/client/cover_traffic_stream.rs index 101a0a77548..d650fb16f91 100644 --- a/common/client-core/src/client/cover_traffic_stream.rs +++ b/common/client-core/src/client/cover_traffic_stream.rs @@ -3,7 +3,6 @@ use crate::client::mix_traffic::BatchMixMessageSender; use crate::client::packet_statistics_control::{PacketStatisticsEvent, PacketStatisticsReporter}; -use crate::client::topology_control::TopologyAccessor; use crate::{config, spawn_future}; use futures::task::{Context, Poll}; use futures::{Future, Stream, StreamExt}; @@ -13,6 +12,7 @@ use nym_sphinx::addressing::clients::Recipient; use nym_sphinx::cover::generate_loop_cover_packet; use nym_sphinx::params::{PacketSize, PacketType}; use nym_sphinx::utils::sample_poisson_duration; +use nym_topology_control::TopologyAccessor; use rand::{rngs::OsRng, CryptoRng, Rng}; use std::pin::Pin; use std::sync::Arc; diff --git a/common/client-core/src/client/mod.rs b/common/client-core/src/client/mod.rs index 9a3a3a33c99..93340e50cfb 100644 --- a/common/client-core/src/client/mod.rs +++ b/common/client-core/src/client/mod.rs @@ -11,5 +11,4 @@ pub(crate) mod packet_statistics_control; pub mod real_messages_control; pub mod received_buffer; pub mod replies; -pub mod topology_control; pub(crate) mod transmission_buffer; diff --git a/common/client-core/src/client/real_messages_control/message_handler.rs b/common/client-core/src/client/real_messages_control/message_handler.rs index 42a19938440..11bb514c661 100644 --- a/common/client-core/src/client/real_messages_control/message_handler.rs +++ b/common/client-core/src/client/real_messages_control/message_handler.rs @@ -7,7 +7,6 @@ use crate::client::real_messages_control::real_traffic_stream::{ }; use crate::client::real_messages_control::{AckActionSender, Action}; use crate::client::replies::reply_storage::{ReceivedReplySurbsMap, SentReplyKeys, UsedSenderTags}; -use crate::client::topology_control::{TopologyAccessor, TopologyReadPermit}; use log::{debug, error, info, trace, warn}; use nym_sphinx::acknowledgements::AckKey; use nym_sphinx::addressing::clients::Recipient; @@ -20,6 +19,7 @@ use nym_sphinx::preparer::{MessagePreparer, PreparedFragment}; use nym_sphinx::Delay; use nym_task::connections::TransmissionLane; use nym_topology::{NymTopology, NymTopologyError}; +use nym_topology_control::{TopologyAccessor, TopologyReadPermit}; use rand::{CryptoRng, Rng}; use std::collections::HashMap; use std::sync::Arc; diff --git a/common/client-core/src/client/real_messages_control/mod.rs b/common/client-core/src/client/real_messages_control/mod.rs index d4914f14406..1aef76ebe77 100644 --- a/common/client-core/src/client/real_messages_control/mod.rs +++ b/common/client-core/src/client/real_messages_control/mod.rs @@ -17,7 +17,6 @@ use crate::{ client::{ inbound_messages::InputMessageReceiver, mix_traffic::BatchMixMessageSender, real_messages_control::acknowledgement_control::AcknowledgementControllerConnectors, - topology_control::TopologyAccessor, }, spawn_future, }; @@ -28,6 +27,7 @@ use nym_sphinx::acknowledgements::AckKey; use nym_sphinx::addressing::clients::Recipient; use nym_sphinx::params::PacketType; use nym_task::connections::{ConnectionCommandReceiver, LaneQueueLengths}; +use nym_topology_control::TopologyAccessor; use rand::{rngs::OsRng, CryptoRng, Rng}; use std::sync::Arc; diff --git a/common/client-core/src/client/real_messages_control/real_traffic_stream.rs b/common/client-core/src/client/real_messages_control/real_traffic_stream.rs index ef91208f1ba..a742b20e203 100644 --- a/common/client-core/src/client/real_messages_control/real_traffic_stream.rs +++ b/common/client-core/src/client/real_messages_control/real_traffic_stream.rs @@ -5,7 +5,6 @@ use self::sending_delay_controller::SendingDelayController; use crate::client::mix_traffic::BatchMixMessageSender; use crate::client::packet_statistics_control::{PacketStatisticsEvent, PacketStatisticsReporter}; use crate::client::real_messages_control::acknowledgement_control::SentPacketNotificationSender; -use crate::client::topology_control::TopologyAccessor; use crate::client::transmission_buffer::TransmissionBuffer; use crate::config; use futures::task::{Context, Poll}; @@ -22,6 +21,7 @@ use nym_sphinx::utils::sample_poisson_duration; use nym_task::connections::{ ConnectionCommand, ConnectionCommandReceiver, ConnectionId, LaneQueueLengths, TransmissionLane, }; +use nym_topology_control::TopologyAccessor; use rand::{CryptoRng, Rng}; use std::pin::Pin; use std::sync::Arc; diff --git a/common/client-core/src/config/mod.rs b/common/client-core/src/config/mod.rs index 3f4bbb4efec..a52b5f8c4dd 100644 --- a/common/client-core/src/config/mod.rs +++ b/common/client-core/src/config/mod.rs @@ -1,14 +1,12 @@ // Copyright 2021-2023 - Nym Technologies SA // SPDX-License-Identifier: Apache-2.0 -use crate::{client::topology_control::geo_aware_provider::CountryGroup, error::ClientCoreError}; +use crate::error::ClientCoreError; use nym_config::defaults::NymNetworkDetails; use nym_crypto::asymmetric::identity; use nym_gateway_client::client::GatewayConfig; -use nym_sphinx::{ - addressing::clients::Recipient, - params::{PacketSize, PacketType}, -}; +use nym_sphinx::params::{PacketSize, PacketType}; +use nym_topology_control::geo_aware_provider::GroupBy; use serde::{Deserialize, Serialize}; use std::time::Duration; use url::Url; @@ -541,22 +539,6 @@ pub enum TopologyStructure { GeoAware(GroupBy), } -#[allow(clippy::large_enum_variant)] -#[derive(Copy, Clone, Debug, PartialEq, Serialize, Deserialize)] -pub enum GroupBy { - CountryGroup(CountryGroup), - NymAddress(Recipient), -} - -impl std::fmt::Display for GroupBy { - fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { - match self { - GroupBy::CountryGroup(group) => write!(f, "group: {}", group), - GroupBy::NymAddress(address) => write!(f, "address: {}", address), - } - } -} - impl Default for Topology { fn default() -> Self { Topology { diff --git a/common/client-core/src/config/old_config_v1_1_30.rs b/common/client-core/src/config/old_config_v1_1_30.rs index 87dae0eb3f4..607fabc7f79 100644 --- a/common/client-core/src/config/old_config_v1_1_30.rs +++ b/common/client-core/src/config/old_config_v1_1_30.rs @@ -1,15 +1,15 @@ // Copyright 2023 - Nym Technologies SA // SPDX-License-Identifier: Apache-2.0 -use crate::client::topology_control::geo_aware_provider::CountryGroup; use crate::config::{ - Acknowledgements, Client, Config, CoverTraffic, DebugConfig, GatewayConnection, GroupBy, - ReplySurbs, Topology, TopologyStructure, Traffic, + Acknowledgements, Client, Config, CoverTraffic, DebugConfig, GatewayConnection, ReplySurbs, + Topology, TopologyStructure, Traffic, }; use nym_sphinx::{ addressing::clients::Recipient, params::{PacketSize, PacketType}, }; +use nym_topology_control::geo_aware_provider::{CountryGroup, GroupBy}; use serde::{Deserialize, Serialize}; use std::time::Duration; use url::Url; diff --git a/common/client-libs/mixnet-client/Cargo.toml b/common/client-libs/mixnet-client/Cargo.toml index 68e048d1f0b..5e0fb12e5ec 100644 --- a/common/client-libs/mixnet-client/Cargo.toml +++ b/common/client-libs/mixnet-client/Cargo.toml @@ -16,3 +16,11 @@ tokio-util = { workspace = true, features = ["codec"] } # internal nym-sphinx = { path = "../../nymsphinx" } nym-task = { path = "../../task" } +nym-topology-control = { path = "../../topology-control" } +nym-noise = { path = "../../nymnoise"} +nym-crypto = { path = "../../crypto" } +nym-validator-client = { path = "../validator-client"} + +[dev-dependencies] +url = { workspace = true } +rand = "0.7.3" diff --git a/common/client-libs/mixnet-client/src/client.rs b/common/client-libs/mixnet-client/src/client.rs index 4da308144f1..6d53deccd50 100644 --- a/common/client-libs/mixnet-client/src/client.rs +++ b/common/client-libs/mixnet-client/src/client.rs @@ -4,11 +4,15 @@ use futures::channel::mpsc; use futures::StreamExt; use log::*; +use nym_crypto::asymmetric::encryption; +use nym_noise::upgrade_noise_initiator_with_topology; use nym_sphinx::addressing::nodes::NymNodeRoutingAddress; use nym_sphinx::framing::codec::NymCodec; use nym_sphinx::framing::packet::FramedNymPacket; use nym_sphinx::params::PacketType; use nym_sphinx::NymPacket; +use nym_topology_control::accessor::TopologyAccessor; +use nym_validator_client::NymApiClient; use std::collections::HashMap; use std::io; use std::net::SocketAddr; @@ -59,6 +63,9 @@ pub trait SendWithoutResponse { pub struct Client { conn_new: HashMap, config: Config, + topology_access: TopologyAccessor, + api_client: NymApiClient, + local_identity: Arc, } struct ConnectionSender { @@ -76,10 +83,18 @@ impl ConnectionSender { } impl Client { - pub fn new(config: Config) -> Client { + pub fn new( + config: Config, + topology_access: TopologyAccessor, + api_client: NymApiClient, + local_identity: Arc, + ) -> Client { Client { conn_new: HashMap::new(), config, + topology_access, + api_client, + local_identity, } } @@ -88,25 +103,60 @@ impl Client { receiver: mpsc::Receiver, connection_timeout: Duration, current_reconnection: &AtomicU32, + topology_access: TopologyAccessor, + api_client: NymApiClient, + local_identity: Arc, ) { let connection_fut = TcpStream::connect(address); let conn = match tokio::time::timeout(connection_timeout, connection_fut).await { - Ok(stream_res) => match stream_res { - Ok(stream) => { - debug!("Managed to establish connection to {}", address); - // if we managed to connect, reset the reconnection count (whatever it might have been) - current_reconnection.store(0, Ordering::Release); - Framed::new(stream, NymCodec) - } - Err(err) => { - debug!( - "failed to establish connection to {} (err: {})", - address, err - ); - return; + Ok(stream_res) => { + match stream_res { + Ok(stream) => { + debug!("Managed to establish connection to {}", address); + // if we managed to connect, reset the reconnection count (whatever it might have been) + current_reconnection.store(0, Ordering::Release); + //Get the topology, because we need the keys for the handshake + let Some(topology) = topology_access.current_topology().await else { + error!("Cannot perform Noise handshake to {address}, due to topology error"); + return; + }; + + let epoch_id = match api_client.get_current_epoch_id().await { + Ok(id) => id, + Err(err) => { + error!("Cannot perform Noise handshake to {address}, due to epoch id error - {err}"); + return; + } + }; + + let noise_stream = match upgrade_noise_initiator_with_topology( + stream, + Default::default(), + &topology, + epoch_id, + local_identity.private_key(), + ) + .await + { + Ok(noise_stream) => noise_stream, + Err(err) => { + error!("Failed to perform Noise handshake with {address} - {err}"); + return; + } + }; + debug!("Noise initiator handshake completed for {:?}", address); + Framed::new(noise_stream, NymCodec) + } + Err(err) => { + debug!( + "failed to establish connection to {} (err: {})", + address, err + ); + return; + } } - }, + } Err(_) => { debug!( "failed to connect to {} within {:?}", @@ -175,6 +225,10 @@ impl Client { // copy the value before moving into another task let initial_connection_timeout = self.config.initial_connection_timeout; + let topology_access_clone = self.topology_access.clone(); + let api_client_clone = self.api_client.clone(); + let local_id_key = self.local_identity.clone(); + tokio::spawn(async move { // before executing the manager, wait for what was specified, if anything if let Some(backoff) = backoff { @@ -187,6 +241,9 @@ impl Client { receiver, initial_connection_timeout, ¤t_reconnection_attempt, + topology_access_clone, + api_client_clone, + local_id_key, ) .await }); @@ -253,15 +310,23 @@ impl SendWithoutResponse for Client { #[cfg(test)] mod tests { use super::*; + use rand::rngs::OsRng; + use url::Url; fn dummy_client() -> Client { - Client::new(Config { - initial_reconnection_backoff: Duration::from_millis(10_000), - maximum_reconnection_backoff: Duration::from_millis(300_000), - initial_connection_timeout: Duration::from_millis(1_500), - maximum_connection_buffer_size: 128, - use_legacy_version: false, - }) + let mut rng = OsRng; + Client::new( + Config { + initial_reconnection_backoff: Duration::from_millis(10_000), + maximum_reconnection_backoff: Duration::from_millis(300_000), + initial_connection_timeout: Duration::from_millis(1_500), + maximum_connection_buffer_size: 128, + use_legacy_version: false, + }, + TopologyAccessor::new(), + NymApiClient::new(Url::parse("http://dummy.url").unwrap()), + Arc::new(encryption::KeyPair::new(&mut rng)), + ) } #[test] diff --git a/common/client-libs/mixnet-client/src/forwarder.rs b/common/client-libs/mixnet-client/src/forwarder.rs index 630cc956632..1a6e9365816 100644 --- a/common/client-libs/mixnet-client/src/forwarder.rs +++ b/common/client-libs/mixnet-client/src/forwarder.rs @@ -5,8 +5,11 @@ use crate::client::{Client, Config, SendWithoutResponse}; use futures::channel::mpsc; use futures::StreamExt; use log::*; +use nym_crypto::asymmetric::encryption; use nym_sphinx::forwarding::packet::MixPacket; -use std::time::Duration; +use nym_topology_control::accessor::TopologyAccessor; +use nym_validator_client::NymApiClient; +use std::sync::Arc; pub type MixForwardingSender = mpsc::UnboundedSender; type MixForwardingReceiver = mpsc::UnboundedReceiver; @@ -21,26 +24,22 @@ pub struct PacketForwarder { impl PacketForwarder { pub fn new( - initial_reconnection_backoff: Duration, - maximum_reconnection_backoff: Duration, - initial_connection_timeout: Duration, - maximum_connection_buffer_size: usize, - use_legacy_version: bool, + client_config: Config, + topology_access: TopologyAccessor, + api_client: NymApiClient, + local_identity: Arc, shutdown: nym_task::TaskClient, ) -> (PacketForwarder, MixForwardingSender) { - let client_config = Config::new( - initial_reconnection_backoff, - maximum_reconnection_backoff, - initial_connection_timeout, - maximum_connection_buffer_size, - use_legacy_version, - ); - let (packet_sender, packet_receiver) = mpsc::unbounded(); ( PacketForwarder { - mixnet_client: Client::new(client_config), + mixnet_client: Client::new( + client_config, + topology_access, + api_client, + local_identity, + ), packet_receiver, shutdown, }, diff --git a/common/client-libs/validator-client/src/client.rs b/common/client-libs/validator-client/src/client.rs index 968a9ddbd3d..5e59f17e3b5 100644 --- a/common/client-libs/validator-client/src/client.rs +++ b/common/client-libs/validator-client/src/client.rs @@ -13,7 +13,7 @@ use nym_api_requests::coconut::{ BlindSignRequestBody, BlindedSignatureResponse, FreePassRequest, VerifyCredentialBody, VerifyCredentialResponse, }; -use nym_api_requests::models::{DescribedGateway, MixNodeBondAnnotated}; +use nym_api_requests::models::{DescribedGateway, DescribedNymNode, MixNodeBondAnnotated}; use nym_api_requests::models::{ GatewayCoreStatusResponse, MixnodeCoreStatusResponse, MixnodeStatusResponse, RewardEstimationResponse, StakeSaturationResponse, @@ -291,6 +291,18 @@ impl NymApiClient { Ok(self.nym_api.get_gateways_described().await?) } + pub async fn get_cached_described_nodes( + &self, + ) -> Result, ValidatorClientError> { + Ok(self.nym_api.get_nym_nodes_described().await?) + } + + pub async fn get_current_epoch_id( + &self, + ) -> Result { + Ok(self.nym_api.get_current_epoch().await?.current_epoch_id()) + } + pub async fn get_gateway_core_status_count( &self, identity: IdentityKeyRef<'_>, diff --git a/common/client-libs/validator-client/src/nym_api/mod.rs b/common/client-libs/validator-client/src/nym_api/mod.rs index 5cf79d462ee..a7b254a1280 100644 --- a/common/client-libs/validator-client/src/nym_api/mod.rs +++ b/common/client-libs/validator-client/src/nym_api/mod.rs @@ -15,16 +15,16 @@ pub use nym_api_requests::{ VerifyCredentialBody, VerifyCredentialResponse, }, models::{ - ComputeRewardEstParam, DescribedGateway, GatewayBondAnnotated, GatewayCoreStatusResponse, - GatewayStatusReportResponse, GatewayUptimeHistoryResponse, InclusionProbabilityResponse, - MixNodeBondAnnotated, MixnodeCoreStatusResponse, MixnodeStatusReportResponse, - MixnodeStatusResponse, MixnodeUptimeHistoryResponse, RewardEstimationResponse, - StakeSaturationResponse, UptimeResponse, + ComputeRewardEstParam, DescribedGateway, DescribedNymNode, GatewayBondAnnotated, + GatewayCoreStatusResponse, GatewayStatusReportResponse, GatewayUptimeHistoryResponse, + InclusionProbabilityResponse, MixNodeBondAnnotated, MixnodeCoreStatusResponse, + MixnodeStatusReportResponse, MixnodeStatusResponse, MixnodeUptimeHistoryResponse, + RewardEstimationResponse, StakeSaturationResponse, UptimeResponse, }, }; pub use nym_coconut_dkg_common::types::EpochId; use nym_mixnet_contract_common::mixnode::MixNodeDetails; -use nym_mixnet_contract_common::{GatewayBond, IdentityKeyRef, MixId}; +use nym_mixnet_contract_common::{GatewayBond, IdentityKeyRef, Interval, MixId}; use nym_name_service_common::response::NamesListResponse; use nym_service_provider_directory_common::response::ServicesListResponse; @@ -97,6 +97,14 @@ pub trait NymApiClientExt: ApiClient { .await } + async fn get_nym_nodes_described(&self) -> Result, NymAPIError> { + self.get_json( + &[routes::API_VERSION, routes::NYM_NODES, routes::DESCRIBED], + NO_PARAMS, + ) + .await + } + async fn get_active_mixnodes(&self) -> Result, NymAPIError> { self.get_json( &[routes::API_VERSION, routes::MIXNODES, routes::ACTIVE], @@ -144,6 +152,14 @@ pub trait NymApiClientExt: ApiClient { .await } + async fn get_current_epoch(&self) -> Result { + self.get_json( + &[routes::API_VERSION, routes::EPOCH, routes::CURRENT], + NO_PARAMS, + ) + .await + } + async fn get_gateway_report( &self, identity: IdentityKeyRef<'_>, diff --git a/common/client-libs/validator-client/src/nym_api/routes.rs b/common/client-libs/validator-client/src/nym_api/routes.rs index 212ae0a8c0f..b313ebc7f2e 100644 --- a/common/client-libs/validator-client/src/nym_api/routes.rs +++ b/common/client-libs/validator-client/src/nym_api/routes.rs @@ -6,8 +6,12 @@ use nym_network_defaults::NYM_API_VERSION; pub const API_VERSION: &str = NYM_API_VERSION; pub const MIXNODES: &str = "mixnodes"; pub const GATEWAYS: &str = "gateways"; +pub const NYM_NODES: &str = "nym-nodes"; pub const DESCRIBED: &str = "described"; +pub const EPOCH: &str = "epoch"; +pub const CURRENT: &str = "current"; + pub const DETAILED: &str = "detailed"; pub const DETAILED_UNFILTERED: &str = "detailed-unfiltered"; pub const ACTIVE: &str = "active"; diff --git a/common/nymnoise/Cargo.toml b/common/nymnoise/Cargo.toml new file mode 100644 index 00000000000..5c33210a65d --- /dev/null +++ b/common/nymnoise/Cargo.toml @@ -0,0 +1,22 @@ +[package] +name = "nym-noise" +version = "0.1.0" +authors = ["Simon Wicky "] +edition = "2021" +license.workspace = true + +[dependencies] +snow = "0.9.2" +futures = "0.3" +tokio = { version = "1.24.1", features = ["net","io-util","time"] } +tokio-util = { workspace = true, features = ["codec"] } +pin-project = "1" +log = "0.4.19" +sha2 = "0.10.7" +bytes = "1.0" +thiserror = "1.0.44" +semver = "0.11" + +# internal +nym-topology = { path = "../topology"} +nym-crypto = { path = "../crypto" } \ No newline at end of file diff --git a/common/nymnoise/src/connection.rs b/common/nymnoise/src/connection.rs new file mode 100644 index 00000000000..1c125df7a09 --- /dev/null +++ b/common/nymnoise/src/connection.rs @@ -0,0 +1,72 @@ +// Copyright 2024 - Nym Technologies SA +// SPDX-License-Identifier: GPL-3.0-only + +use std::io; + +use pin_project::pin_project; +use tokio::{ + io::{AsyncRead, AsyncWrite}, + net::TcpStream, +}; + +use crate::stream::NoiseStream; + +#[pin_project(project = ConnectionProj)] +pub enum Connection { + Tcp(#[pin] TcpStream), + Noise(#[pin] NoiseStream), +} + +impl Connection { + pub fn peer_addr(&self) -> Result { + match self { + Self::Noise(stream) => stream.peer_addr(), + Self::Tcp(stream) => stream.peer_addr(), + } + } +} + +impl AsyncRead for Connection { + fn poll_read( + self: std::pin::Pin<&mut Self>, + cx: &mut std::task::Context<'_>, + buf: &mut tokio::io::ReadBuf<'_>, + ) -> std::task::Poll> { + match self.project() { + ConnectionProj::Noise(stream) => stream.poll_read(cx, buf), + ConnectionProj::Tcp(stream) => stream.poll_read(cx, buf), + } + } +} + +impl AsyncWrite for Connection { + fn poll_write( + self: std::pin::Pin<&mut Self>, + cx: &mut std::task::Context<'_>, + buf: &[u8], + ) -> std::task::Poll> { + match self.project() { + ConnectionProj::Noise(stream) => stream.poll_write(cx, buf), + ConnectionProj::Tcp(stream) => stream.poll_write(cx, buf), + } + } + fn poll_flush( + self: std::pin::Pin<&mut Self>, + cx: &mut std::task::Context<'_>, + ) -> std::task::Poll> { + match self.project() { + ConnectionProj::Noise(stream) => stream.poll_flush(cx), + ConnectionProj::Tcp(stream) => stream.poll_flush(cx), + } + } + + fn poll_shutdown( + self: std::pin::Pin<&mut Self>, + cx: &mut std::task::Context<'_>, + ) -> std::task::Poll> { + match self.project() { + ConnectionProj::Noise(stream) => stream.poll_shutdown(cx), + ConnectionProj::Tcp(stream) => stream.poll_shutdown(cx), + } + } +} diff --git a/common/nymnoise/src/error.rs b/common/nymnoise/src/error.rs new file mode 100644 index 00000000000..b97ea38f394 --- /dev/null +++ b/common/nymnoise/src/error.rs @@ -0,0 +1,43 @@ +// Copyright 2023 - Nym Technologies SA +// SPDX-License-Identifier: GPL-3.0-only + +use snow::Error; +use std::io; +use std::num::TryFromIntError; +use thiserror::Error; + +#[derive(Error, Debug)] +pub enum NoiseError { + #[error("encountered a Noise decryption error")] + DecryptionError, + + #[error("encountered a Noise Protocol error - {0}")] + ProtocolError(Error), + + #[error("encountered an IO error - {0}")] + IoError(#[from] io::Error), + + #[error("Incorrect state")] + IncorrectStateError, + + #[error("Handshake timeout")] + HandshakeTimeoutError(#[from] tokio::time::error::Elapsed), + + #[error("Handshake did not complete")] + HandshakeError, + + #[error(transparent)] + IntConversionError(#[from] TryFromIntError), + + #[error("unable to extract public key - {0}")] + EncryptionKeyConversionError(#[from] nym_crypto::asymmetric::encryption::KeyRecoveryError), +} + +impl From for NoiseError { + fn from(err: Error) -> Self { + match err { + Error::Decrypt => NoiseError::DecryptionError, + err => NoiseError::ProtocolError(err), + } + } +} diff --git a/common/nymnoise/src/lib.rs b/common/nymnoise/src/lib.rs new file mode 100644 index 00000000000..7828abcef2e --- /dev/null +++ b/common/nymnoise/src/lib.rs @@ -0,0 +1,148 @@ +// Copyright 2023 - Nym Technologies SA +// SPDX-License-Identifier: GPL-3.0-only + +use crate::connection::Connection; +use crate::error::NoiseError; +use crate::stream::{NoisePattern, NoiseStream}; +use log::*; +use nym_crypto::asymmetric::encryption; +use nym_topology::NymTopology; +use sha2::{Digest, Sha256}; +use snow::{error::Prerequisite, Builder, Error}; +use tokio::net::TcpStream; + +pub mod connection; +pub mod error; +pub mod stream; + +const NOISE_PSK_PREFIX: &[u8] = b"NYMTECH_NOISE_dQw4w9WgXcQ"; + +pub async fn upgrade_noise_initiator( + conn: TcpStream, + pattern: NoisePattern, + local_private_key: &encryption::PrivateKey, + remote_pub_key: &encryption::PublicKey, + epoch: u32, +) -> Result { + trace!("Perform Noise Handshake, initiator side"); + + let secret = [ + NOISE_PSK_PREFIX.to_vec(), + remote_pub_key.to_bytes().to_vec(), + epoch.to_be_bytes().to_vec(), + ] + .concat(); + let secret_hash = Sha256::digest(secret); + + let handshake = Builder::new(pattern.as_str().parse()?) + .local_private_key(&local_private_key.to_bytes()) + .remote_public_key(&remote_pub_key.to_bytes()) + .psk(pattern.psk_position(), &secret_hash) + .build_initiator()?; + + let noise_stream = NoiseStream::new(conn, handshake); + + Ok(Connection::Noise(noise_stream.perform_handshake().await?)) +} + +pub async fn upgrade_noise_initiator_with_topology( + conn: TcpStream, + pattern: NoisePattern, + topology: &NymTopology, + epoch: u32, + local_private_key: &encryption::PrivateKey, +) -> Result { + //Get init material + let responder_addr = conn.peer_addr().map_err(|err| { + error!("Unable to extract peer address from connection - {err}"); + Error::Prereq(Prerequisite::RemotePublicKey) + })?; + + let remote_pub_key = match topology.find_node_key_by_mix_host(responder_addr, true) { + Ok(Some(key)) => encryption::PublicKey::from_base58_string(key)?, + Ok(None) => { + warn!( + "{:?} can't speak Noise yet, falling back to TCP", + responder_addr + ); + return Ok(Connection::Tcp(conn)); + } + Err(_) => { + error!( + "Cannot find public key for node with address {:?}", + responder_addr + ); //Do we still pursue a TCP connection or not? + return Err(Error::Prereq(Prerequisite::RemotePublicKey).into()); + } + }; + + upgrade_noise_initiator(conn, pattern, local_private_key, &remote_pub_key, epoch).await +} + +pub async fn upgrade_noise_responder( + conn: TcpStream, + pattern: NoisePattern, + local_public_key: &encryption::PublicKey, + local_private_key: &encryption::PrivateKey, + epoch: u32, +) -> Result { + trace!("Perform Noise Handshake, responder side"); + + let secret = [ + NOISE_PSK_PREFIX.to_vec(), + local_public_key.to_bytes().to_vec(), + epoch.to_be_bytes().to_vec(), + ] + .concat(); + let secret_hash = Sha256::digest(secret); + + let handshake = Builder::new(pattern.as_str().parse()?) + .local_private_key(&local_private_key.to_bytes()) + .psk(pattern.psk_position(), &secret_hash) + .build_responder()?; + + let noise_stream = NoiseStream::new(conn, handshake); + + Ok(Connection::Noise(noise_stream.perform_handshake().await?)) +} + +pub async fn upgrade_noise_responder_with_topology( + conn: TcpStream, + pattern: NoisePattern, + topology: &NymTopology, + epoch: u32, + local_public_key: &encryption::PublicKey, + local_private_key: &encryption::PrivateKey, +) -> Result { + //Get init material + let initiator_addr = match conn.peer_addr() { + Ok(addr) => addr, + Err(err) => { + error!("Unable to extract peer address from connection - {err}"); + return Err(Error::Prereq(Prerequisite::RemotePublicKey).into()); + } + }; + + match topology.find_node_key_by_mix_host(initiator_addr, false) { + Ok(Some(_)) => { + //Existing node supporting Noise + upgrade_noise_responder(conn, pattern, local_public_key, local_private_key, epoch).await + } + Ok(None) => { + //Existing node not supporting Noise yet + warn!( + "{:?} can't speak Noise yet, falling back to TCP", + initiator_addr + ); + Ok(Connection::Tcp(conn)) + } + Err(_) => { + //Non existing node + error!( + "Cannot find public key for node with address {:?}", + initiator_addr + ); //Do we still pursue a TCP connection with that node or not? + Err(Error::Prereq(Prerequisite::RemotePublicKey).into()) + } + } +} diff --git a/common/nymnoise/src/stream.rs b/common/nymnoise/src/stream.rs new file mode 100644 index 00000000000..c2abdcdf04e --- /dev/null +++ b/common/nymnoise/src/stream.rs @@ -0,0 +1,220 @@ +// Copyright 2023 - Nym Technologies SA +// SPDX-License-Identifier: GPL-3.0-only + +use crate::error::NoiseError; +use bytes::BytesMut; +use futures::{Sink, SinkExt, Stream, StreamExt}; +use pin_project::pin_project; +use snow::{HandshakeState, TransportState}; +use std::cmp::min; +use std::collections::VecDeque; +use std::io; +use std::pin::Pin; +use std::task::Poll; +use tokio::{ + io::{AsyncRead, AsyncWrite, ReadBuf}, + net::TcpStream, +}; +use tokio_util::codec::{Framed, LengthDelimitedCodec}; + +const MAXMSGLEN: usize = 65535; +const TAGLEN: usize = 16; + +#[derive(Default)] +pub enum NoisePattern { + #[default] + XKpsk3, + IKpsk2, +} + +impl NoisePattern { + pub(crate) fn as_str(&self) -> &'static str { + match self { + Self::XKpsk3 => "Noise_XKpsk3_25519_AESGCM_SHA256", + Self::IKpsk2 => "Noise_IKpsk2_25519_ChaChaPoly_BLAKE2s", //Wireguard handshake (not exactly though) + } + } + + pub(crate) fn psk_position(&self) -> u8 { + //automatic parsing, works for correct pattern, more convenient + match self.as_str().find("psk") { + Some(n) => { + let psk_index = n + 3; + let psk_char = self.as_str().chars().nth(psk_index).unwrap(); + psk_char.to_string().parse().unwrap() + //if this fails, it means hardcoded pattern are wrong + } + None => 0, + } + } +} + +/// Wrapper around a TcpStream +#[pin_project] +pub struct NoiseStream { + #[pin] + inner_stream: Framed, + handshake: Option, + noise: Option, + dec_buffer: VecDeque, +} + +impl NoiseStream { + pub(crate) fn new(inner_stream: TcpStream, handshake: HandshakeState) -> NoiseStream { + NoiseStream { + inner_stream: LengthDelimitedCodec::builder() + .length_field_type::() + .new_framed(inner_stream), + handshake: Some(handshake), + noise: None, + dec_buffer: VecDeque::with_capacity(MAXMSGLEN), + } + } + + pub(crate) async fn perform_handshake(mut self) -> Result { + //Check if we are in the correct state + let Some(mut handshake) = self.handshake else { + return Err(NoiseError::IncorrectStateError); + }; + self.handshake = None; + + while !handshake.is_handshake_finished() { + if handshake.is_my_turn() { + self.send_handshake_msg(&mut handshake).await?; + } else { + self.recv_handshake_msg(&mut handshake).await?; + } + } + + self.noise = Some(handshake.into_transport_mode()?); + Ok(self) + } + + async fn send_handshake_msg( + &mut self, + handshake: &mut HandshakeState, + ) -> Result<(), NoiseError> { + let mut buf = BytesMut::zeroed(MAXMSGLEN + TAGLEN); + let len = handshake.write_message(&[], &mut buf)?; + buf.truncate(len); + self.inner_stream.send(buf.into()).await?; + Ok(()) + } + + async fn recv_handshake_msg( + &mut self, + handshake: &mut HandshakeState, + ) -> Result<(), NoiseError> { + match self.inner_stream.next().await { + Some(Ok(msg)) => { + let mut buf = vec![0u8; MAXMSGLEN]; + handshake.read_message(&msg, &mut buf)?; + Ok(()) + } + Some(Err(err)) => Err(NoiseError::IoError(err)), + None => Err(NoiseError::HandshakeError), + } + } + + pub fn peer_addr(&self) -> Result { + self.inner_stream.get_ref().peer_addr() + } +} + +impl AsyncRead for NoiseStream { + fn poll_read( + self: Pin<&mut Self>, + cx: &mut std::task::Context<'_>, + buf: &mut ReadBuf<'_>, + ) -> Poll> { + let projected_self = self.project(); + + match projected_self.inner_stream.poll_next(cx) { + Poll::Pending => { + //no new data, waking is already scheduled. + //Nothing new to decrypt, only check if we can return something from dec_storage, happens after + } + + Poll::Ready(Some(Ok(noise_msg))) => { + //We have a new moise msg + let mut dec_msg = vec![0u8; MAXMSGLEN]; + let len = match projected_self.noise { + Some(transport_state) => { + match transport_state.read_message(&noise_msg, &mut dec_msg) { + Ok(len) => len, + Err(_) => return Poll::Ready(Err(io::ErrorKind::InvalidInput.into())), + } + } + None => return Poll::Ready(Err(io::ErrorKind::Other.into())), + }; + projected_self.dec_buffer.extend(&dec_msg[..len]); + } + + Poll::Ready(Some(Err(err))) => return Poll::Ready(Err(err)), + + //Stream is done, return Ok with nothing in buf + Poll::Ready(None) => return Poll::Ready(Ok(())), + } + + //check and return what we can + let read_len = min(buf.remaining(), projected_self.dec_buffer.len()); + if read_len > 0 { + buf.put_slice( + &projected_self + .dec_buffer + .drain(..read_len) + .collect::>(), + ); + return Poll::Ready(Ok(())); + } + + //If we end up here, it must mean the previous poll_next was pending as well, otherwise something was returned. Hence waking is already scheduled + Poll::Pending + } +} + +impl AsyncWrite for NoiseStream { + fn poll_write( + self: Pin<&mut Self>, + cx: &mut std::task::Context<'_>, + buf: &[u8], + ) -> Poll> { + let mut projected_self = self.project(); + + match projected_self.inner_stream.as_mut().poll_ready(cx) { + Poll::Pending => Poll::Pending, + + Poll::Ready(Err(err)) => Poll::Ready(Err(err)), + + Poll::Ready(Ok(())) => { + let mut noise_buf = BytesMut::zeroed(MAXMSGLEN + TAGLEN); + + let Ok(len) = (match projected_self.noise { + Some(transport_state) => transport_state.write_message(buf, &mut noise_buf), + None => return Poll::Ready(Err(io::ErrorKind::Other.into())), + }) else { + return Poll::Ready(Err(io::ErrorKind::InvalidInput.into())); + }; + noise_buf.truncate(len); + match projected_self.inner_stream.start_send(noise_buf.into()) { + Ok(()) => Poll::Ready(Ok(buf.len())), + Err(e) => Poll::Ready(Err(e)), + } + } + } + } + + fn poll_flush( + self: Pin<&mut Self>, + cx: &mut std::task::Context<'_>, + ) -> Poll> { + self.project().inner_stream.poll_flush(cx) + } + + fn poll_shutdown( + self: Pin<&mut Self>, + cx: &mut std::task::Context<'_>, + ) -> Poll> { + self.project().inner_stream.poll_close(cx) + } +} diff --git a/common/nymsphinx/src/receiver.rs b/common/nymsphinx/src/receiver.rs index 742de7c85e1..65c791b1f93 100644 --- a/common/nymsphinx/src/receiver.rs +++ b/common/nymsphinx/src/receiver.rs @@ -237,7 +237,7 @@ mod message_receiver { mix_id: 123, owner: "foomp1".to_string(), host: "10.20.30.40".parse().unwrap(), - mix_host: "10.20.30.40:1789".parse().unwrap(), + mix_hosts: vec!["10.20.30.40:1789".parse().unwrap()], identity_key: identity::PublicKey::from_base58_string( "3ebjp1Fb9hdcS1AR6AZihgeJiMHkB5jjJUsvqNnfQwU7", ) @@ -257,7 +257,7 @@ mod message_receiver { mix_id: 234, owner: "foomp2".to_string(), host: "11.21.31.41".parse().unwrap(), - mix_host: "11.21.31.41:1789".parse().unwrap(), + mix_hosts: vec!["11.21.31.41:1789".parse().unwrap()], identity_key: identity::PublicKey::from_base58_string( "D6YaMzLSY7mANtSQRKXsmMZpqgqiVkeiagKM4V4oFPFr", ) @@ -277,7 +277,7 @@ mod message_receiver { mix_id: 456, owner: "foomp3".to_string(), host: "12.22.32.42".parse().unwrap(), - mix_host: "12.22.32.42:1789".parse().unwrap(), + mix_hosts: vec!["12.22.32.42:1789".parse().unwrap()], identity_key: identity::PublicKey::from_base58_string( "GkWDysw4AjESv1KiAiVn7JzzCMJeksxNSXVfr1PpX8wD", ) @@ -297,7 +297,7 @@ mod message_receiver { vec![gateway::Node { owner: "foomp4".to_string(), host: "1.2.3.4".parse().unwrap(), - mix_host: "1.2.3.4:1789".parse().unwrap(), + mix_hosts: vec!["1.2.3.4:1789".parse().unwrap()], clients_ws_port: 9000, clients_wss_port: None, identity_key: identity::PublicKey::from_base58_string( diff --git a/common/topology-control/Cargo.toml b/common/topology-control/Cargo.toml new file mode 100644 index 00000000000..e4ea9a32e7e --- /dev/null +++ b/common/topology-control/Cargo.toml @@ -0,0 +1,42 @@ +[package] +name = "nym-topology-control" +version = "0.1.0" +edition.workspace = true +authors.workspace = true +license.workspace = true +repository.workspace = true + +# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html + +[dependencies] +async-trait = { workspace = true } +futures = { workspace = true } +log = { workspace = true } +rand = { version = "0.7.3", features = ["wasm-bindgen"] } +serde = { workspace = true, features = ["derive"] } +tap = "1.0.1" +url = { workspace = true, features = ["serde"] } +tokio = { workspace = true, features = ["macros"]} + +# internal + +nym-explorer-client = { path = "../../explorer-api/explorer-client" } +nym-sphinx = { path = "../nymsphinx" } +nym-topology = { path = "../topology", features = ["serializable"] } +nym-validator-client = { path = "../client-libs/validator-client", default-features = false } +nym-task = { path = "../task" } +nym-network-defaults = { path = "../network-defaults" } + + +[target."cfg(not(target_arch = \"wasm32\"))".dependencies.tokio-stream] +version = "0.1.11" +features = ["time"] + + +[target."cfg(target_arch = \"wasm32\")".dependencies.wasmtimer] +workspace = true +features = ["tokio"] + +[target."cfg(target_arch = \"wasm32\")".dependencies.gloo-timers] +version = "0.2.4" +features = ["futures"] \ No newline at end of file diff --git a/common/client-core/src/client/topology_control/accessor.rs b/common/topology-control/src/accessor.rs similarity index 92% rename from common/client-core/src/client/topology_control/accessor.rs rename to common/topology-control/src/accessor.rs index fcb272c9343..da463a4e91a 100644 --- a/common/client-core/src/client/topology_control/accessor.rs +++ b/common/topology-control/src/accessor.rs @@ -49,7 +49,7 @@ impl<'a> Deref for TopologyReadPermit<'a> { impl<'a> TopologyReadPermit<'a> { /// Using provided topology read permit, tries to get an immutable reference to the underlying /// topology. For obvious reasons the lifetime of the topology reference is bound to the permit. - pub(crate) fn try_get_valid_topology_ref( + pub fn try_get_valid_topology_ref( &'a self, ack_recipient: &Recipient, packet_recipient: Option<&Recipient>, @@ -83,6 +83,16 @@ impl<'a> TopologyReadPermit<'a> { Ok(topology) } + + pub fn try_get_raw_topology_ref(&'a self) -> Result<&'a NymTopology, NymTopologyError> { + // 1. Have we managed to get anything from the refresher, i.e. have the nym-api queries gone through? + let topology = self + .permit + .as_ref() + .ok_or(NymTopologyError::EmptyNetworkTopology)?; + + Ok(topology) + } } impl<'a> From>> for TopologyReadPermit<'a> { diff --git a/common/client-core/src/client/topology_control/geo_aware_provider.rs b/common/topology-control/src/geo_aware_provider.rs similarity index 93% rename from common/client-core/src/client/topology_control/geo_aware_provider.rs rename to common/topology-control/src/geo_aware_provider.rs index 71de0327b59..5ebf39bec2c 100644 --- a/common/client-core/src/client/topology_control/geo_aware_provider.rs +++ b/common/topology-control/src/geo_aware_provider.rs @@ -3,6 +3,7 @@ use std::{collections::HashMap, fmt}; use log::{debug, error, info}; use nym_explorer_client::{ExplorerClient, PrettyDetailedMixNodeBond}; use nym_network_defaults::var_names::EXPLORER_API; +use nym_sphinx::addressing::clients::Recipient; use nym_topology::{ nym_topology_from_detailed, provider_trait::{async_trait, TopologyProvider}, @@ -14,8 +15,6 @@ use serde::{Deserialize, Serialize}; use tap::TapOptional; use url::Url; -use crate::config::GroupBy; - const MIN_NODES_PER_LAYER: usize = 1; fn create_explorer_client() -> Option { @@ -38,6 +37,22 @@ fn create_explorer_client() -> Option { Some(client) } +#[allow(clippy::large_enum_variant)] +#[derive(Copy, Clone, Debug, PartialEq, Serialize, Deserialize)] +pub enum GroupBy { + CountryGroup(CountryGroup), + NymAddress(Recipient), +} + +impl std::fmt::Display for GroupBy { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + match self { + GroupBy::CountryGroup(group) => write!(f, "group: {}", group), + GroupBy::NymAddress(address) => write!(f, "address: {}", address), + } + } +} + #[derive(Copy, Clone, Hash, PartialEq, Eq, Serialize, Deserialize, Debug)] pub enum CountryGroup { Europe, @@ -279,6 +294,14 @@ impl GeoAwareTopologyProvider { Ok(gateways) => gateways, }; + let nodes_described = match self.validator_client.get_cached_described_nodes().await { + Err(err) => { + error!("failed to get described nodes - {err}"); + return None; + } + Ok(epoch) => epoch, + }; + // Also fetch mixnodes cached by explorer-api, with the purpose of getting their // geolocation. debug!("Fetching mixnodes from explorer-api..."); @@ -338,7 +361,7 @@ impl GeoAwareTopologyProvider { .filter(|m| filtered_mixnode_ids.contains(&m.mix_id())) .collect::>(); - let topology = nym_topology_from_detailed(mixnodes, gateways) + let topology = nym_topology_from_detailed(mixnodes, gateways, nodes_described) .filter_system_version(&self.client_version); // TODO: return real error type diff --git a/common/client-core/src/client/topology_control/mod.rs b/common/topology-control/src/lib.rs similarity index 96% rename from common/client-core/src/client/topology_control/mod.rs rename to common/topology-control/src/lib.rs index ae501289424..ead03a8c0b6 100644 --- a/common/client-core/src/client/topology_control/mod.rs +++ b/common/topology-control/src/lib.rs @@ -1,11 +1,11 @@ // Copyright 2021-2023 - Nym Technologies SA // SPDX-License-Identifier: Apache-2.0 -use crate::spawn_future; -pub(crate) use accessor::{TopologyAccessor, TopologyReadPermit}; +pub use accessor::{TopologyAccessor, TopologyReadPermit}; use futures::StreamExt; use log::*; use nym_sphinx::addressing::nodes::NodeIdentity; +use nym_task::spawn; use nym_topology::provider_trait::TopologyProvider; use nym_topology::NymTopologyError; use std::time::Duration; @@ -16,9 +16,9 @@ use tokio::time::sleep; #[cfg(target_arch = "wasm32")] use wasmtimer::tokio::sleep; -mod accessor; +pub mod accessor; pub mod geo_aware_provider; -pub(crate) mod nym_api_provider; +pub mod nym_api_provider; // TODO: move it to config later const MAX_FAILURE_COUNT: usize = 10; @@ -142,7 +142,7 @@ impl TopologyRefresher { } pub fn start_with_shutdown(mut self, mut shutdown: nym_task::TaskClient) { - spawn_future(async move { + spawn(async move { debug!("Started TopologyRefresher with graceful shutdown support"); #[cfg(not(target_arch = "wasm32"))] diff --git a/common/client-core/src/client/topology_control/nym_api_provider.rs b/common/topology-control/src/nym_api_provider.rs similarity index 86% rename from common/client-core/src/client/topology_control/nym_api_provider.rs rename to common/topology-control/src/nym_api_provider.rs index 824294793bf..782404b253a 100644 --- a/common/client-core/src/client/topology_control/nym_api_provider.rs +++ b/common/topology-control/src/nym_api_provider.rs @@ -9,7 +9,7 @@ use rand::prelude::SliceRandom; use rand::thread_rng; use url::Url; -pub(crate) struct NymApiTopologyProvider { +pub struct NymApiTopologyProvider { validator_client: nym_validator_client::client::NymApiClient, nym_api_urls: Vec, @@ -18,7 +18,7 @@ pub(crate) struct NymApiTopologyProvider { } impl NymApiTopologyProvider { - pub(crate) fn new(mut nym_api_urls: Vec, client_version: String) -> Self { + pub fn new(mut nym_api_urls: Vec, client_version: String) -> Self { nym_api_urls.shuffle(&mut thread_rng()); NymApiTopologyProvider { @@ -77,13 +77,22 @@ impl NymApiTopologyProvider { Ok(gateways) => gateways, }; - let topology = nym_topology_from_detailed(mixnodes, gateways) + let nodes_described = match self.validator_client.get_cached_described_nodes().await { + Err(err) => { + error!("failed to get described nodes - {err}"); + return None; + } + Ok(epoch) => epoch, + }; + + let topology = nym_topology_from_detailed(mixnodes, gateways, nodes_described.clone()) .filter_system_version(&self.client_version); if let Err(err) = self.check_layer_distribution(&topology) { warn!("The current filtered active topology has extremely skewed layer distribution. It cannot be used: {err}"); self.use_next_nym_api(); - None + let empty_topology = NymTopology::empty().with_described_nodes(nodes_described); + Some(empty_topology) } else { Some(topology) } diff --git a/common/topology/src/gateway.rs b/common/topology/src/gateway.rs index 9d10b8d2a03..dc68e412e0d 100644 --- a/common/topology/src/gateway.rs +++ b/common/topology/src/gateway.rs @@ -46,9 +46,9 @@ pub enum GatewayConversionError { pub struct Node { pub owner: String, pub host: NetworkAddress, - // we're keeping this as separate resolved field since we do not want to be resolving the potential - // hostname every time we want to construct a path via this node - pub mix_host: SocketAddr, + // we're keeping all resolved IPs as a separate field since we do not want to be resolving the potential + // hostname every time we want to construct a path via this node. When we need one, we default to the first one + pub mix_hosts: Vec, // #[serde(alias = "clients_port")] pub clients_ws_port: u16, @@ -66,7 +66,7 @@ impl std::fmt::Debug for Node { f.debug_struct("gateway::Node") .field("host", &self.host) .field("owner", &self.owner) - .field("mix_host", &self.mix_host) + .field("mix_hosts", &self.mix_hosts) .field("clients_ws_port", &self.clients_ws_port) .field("clients_wss_port", &self.clients_wss_port) .field("identity_key", &self.identity_key.to_base58_string()) @@ -88,13 +88,12 @@ impl Node { pub fn extract_mix_host( host: &NetworkAddress, mix_port: u16, - ) -> Result { - Ok(host.to_socket_addrs(mix_port).map_err(|err| { - GatewayConversionError::InvalidAddress { + ) -> Result, GatewayConversionError> { + host.to_socket_addrs(mix_port) + .map_err(|err| GatewayConversionError::InvalidAddress { value: host.to_string(), source: err, - } - })?[0]) + }) } pub fn identity(&self) -> &NodeIdentity { @@ -135,7 +134,7 @@ impl filter::Versioned for Node { impl<'a> From<&'a Node> for SphinxNode { fn from(node: &'a Node) -> Self { - let node_address_bytes = NymNodeRoutingAddress::from(node.mix_host) + let node_address_bytes = NymNodeRoutingAddress::from(node.mix_hosts[0]) .try_into() .unwrap(); @@ -151,12 +150,12 @@ impl<'a> TryFrom<&'a GatewayBond> for Node { // try to completely resolve the host in the mix situation to avoid doing it every // single time we want to construct a path - let mix_host = Self::extract_mix_host(&host, bond.gateway.mix_port)?; + let mix_hosts = Self::extract_mix_host(&host, bond.gateway.mix_port)?; Ok(Node { owner: bond.owner.as_str().to_owned(), host, - mix_host, + mix_hosts, clients_ws_port: bond.gateway.clients_port, clients_wss_port: None, identity_key: identity::PublicKey::from_base58_string(&bond.gateway.identity_key)?, @@ -196,14 +195,17 @@ impl<'a> TryFrom<&'a DescribedGateway> for Node { // get ip from the self-reported values so we wouldn't need to do any hostname resolution // (which doesn't really work in wasm) - let mix_host = SocketAddr::new(ips[0], value.bond.gateway.mix_port); + let mix_hosts = ips + .iter() + .map(|ip| SocketAddr::new(*ip, value.bond.gateway.mix_port)) + .collect(); Ok(Node { owner: value.bond.owner.as_str().to_owned(), host, - mix_host, - clients_ws_port: self_described.mixnet_websockets.ws_port, - clients_wss_port: self_described.mixnet_websockets.wss_port, + mix_hosts, + clients_ws_port: self_described.mixnet_websockets.unwrap().ws_port, //SW gateway have that field + clients_wss_port: self_described.mixnet_websockets.unwrap().wss_port, //SW gateway have that field identity_key: identity::PublicKey::from_base58_string( &self_described.host_information.keys.ed25519, )?, diff --git a/common/topology/src/lib.rs b/common/topology/src/lib.rs index 039e30eb947..652bfd0370b 100644 --- a/common/topology/src/lib.rs +++ b/common/topology/src/lib.rs @@ -19,7 +19,7 @@ use std::str::FromStr; #[cfg(feature = "serializable")] use ::serde::{Deserialize, Deserializer, Serialize, Serializer}; -use nym_api_requests::models::DescribedGateway; +use nym_api_requests::models::{DescribedGateway, DescribedNymNode}; pub mod error; pub mod filter; @@ -115,14 +115,40 @@ pub type MixLayer = u8; pub struct NymTopology { mixes: BTreeMap>, gateways: Vec, + described_nodes: Vec, } impl NymTopology { - pub fn new(mixes: BTreeMap>, gateways: Vec) -> Self { - NymTopology { mixes, gateways } + pub fn new( + mixes: BTreeMap>, + gateways: Vec, + described_nodes: Vec, + ) -> Self { + NymTopology { + mixes: mixes.clone(), + gateways: gateways.clone(), + described_nodes: described_nodes.clone(), + } + } + + pub fn empty() -> Self { + NymTopology { + mixes: BTreeMap::new(), + gateways: Vec::new(), + described_nodes: Vec::new(), + } } - pub fn new_unordered(unordered_mixes: Vec, gateways: Vec) -> Self { + pub fn with_described_nodes(mut self, described_nodes: Vec) -> Self { + self.described_nodes = described_nodes; + self + } + + pub fn new_unordered( + unordered_mixes: Vec, + gateways: Vec, + described_nodes: Vec, + ) -> Self { let mut mixes = BTreeMap::new(); for node in unordered_mixes.into_iter() { let layer = node.layer as MixLayer; @@ -130,7 +156,7 @@ impl NymTopology { layer_entry.push(node) } - NymTopology { mixes, gateways } + NymTopology::new(mixes, gateways, described_nodes) } #[cfg(feature = "serializable")] @@ -142,8 +168,9 @@ impl NymTopology { pub fn from_detailed( mix_details: Vec, gateway_bonds: Vec, + described_nodes: Vec, ) -> Self { - nym_topology_from_detailed(mix_details, gateway_bonds) + nym_topology_from_detailed(mix_details, gateway_bonds, described_nodes) } pub fn find_mix(&self, mix_id: MixId) -> Option<&mix::Node> { @@ -168,6 +195,53 @@ impl NymTopology { None } + pub fn find_node_key_by_mix_host( + &self, + mix_host: SocketAddr, + check_port: bool, + ) -> Result, NymTopologyError> { + for node in self.described_nodes.iter() { + let (sphinx_key, socket_addresses, description) = match node { + DescribedNymNode::Gateway(g) => { + let sphinx_key = &g.bond.gateway.sphinx_key; + let gateway_node: Option = (&g.bond).try_into().ok(); + let mix_hosts = gateway_node.map(|node| node.mix_hosts); + let description = &g.self_described; + (sphinx_key, mix_hosts, description) + } + DescribedNymNode::Mixnode(m) => { + let sphinx_key = &m.bond.mix_node.sphinx_key; + let mix_node: Option = (&m.bond).try_into().ok(); + let mix_hosts = mix_node.map(|node| node.mix_hosts); + let description = &m.self_described; + (sphinx_key, mix_hosts, description) + } + }; + if let Some(sock_addr) = socket_addresses { + let existing_node = if check_port { + //Initiator side, we know the port should be correct as well + sock_addr.contains(&mix_host) + } else { + //responder side, we don't know the port. + //SW This can lead to some troubles if two nodes shares the same IP and one support Noise but not the other. This in only for the progressive update though + let ip_addresses = sock_addr.iter().map(|addr| addr.ip()).collect::>(); + ip_addresses.contains(&mix_host.ip()) + }; + if existing_node { + //we have our node + if let Some(d) = description { + if d.noise_information.supported { + return Ok(Some(sphinx_key.to_string())); + } + } + return Ok(None); + } + } + } + //didn't find that node + Err(NymTopologyError::NoMixnodesAvailable) + } + pub fn find_gateway(&self, gateway_identity: IdentityKeyRef) -> Option<&gateway::Node> { self.gateways .iter() @@ -379,6 +453,7 @@ impl NymTopology { NymTopology { mixes: self.mixes.filter_by_version(expected_mix_version), gateways: self.gateways.clone(), + described_nodes: self.described_nodes.clone(), } } } @@ -426,6 +501,7 @@ impl IntoGatewayNode for DescribedGateway { pub fn nym_topology_from_detailed( mix_details: Vec, gateway_bonds: Vec, + described_nodes: Vec, ) -> NymTopology where G: IntoGatewayNode, @@ -469,7 +545,7 @@ where } } - NymTopology::new(mixes, gateways) + NymTopology::new(mixes, gateways, described_nodes) } #[cfg(test)] @@ -489,7 +565,7 @@ mod converting_mixes_to_vec { mix_id: 42, owner: "N/A".to_string(), host: "3.3.3.3".parse().unwrap(), - mix_host: "3.3.3.3:1789".parse().unwrap(), + mix_hosts: vec!["3.3.3.3:1789".parse().unwrap()], identity_key: identity::PublicKey::from_base58_string( "3ebjp1Fb9hdcS1AR6AZihgeJiMHkB5jjJUsvqNnfQwU7", ) @@ -516,7 +592,7 @@ mod converting_mixes_to_vec { mixes.insert(1, vec![node1, node2]); mixes.insert(2, vec![node3]); - let topology = NymTopology::new(mixes, vec![]); + let topology = NymTopology::new(mixes, vec![], vec![]); let mixvec = topology.mixes_as_vec(); assert!(mixvec.iter().any(|node| node.owner == "N/A")); } @@ -528,7 +604,7 @@ mod converting_mixes_to_vec { #[test] fn returns_an_empty_vec() { - let topology = NymTopology::new(BTreeMap::new(), vec![]); + let topology = NymTopology::new(BTreeMap::new(), vec![], vec![]); let mixvec = topology.mixes_as_vec(); assert!(mixvec.is_empty()); } diff --git a/common/topology/src/mix.rs b/common/topology/src/mix.rs index 0481d5bc16d..ce9d59d0a15 100644 --- a/common/topology/src/mix.rs +++ b/common/topology/src/mix.rs @@ -34,9 +34,9 @@ pub struct Node { pub mix_id: MixId, pub owner: String, pub host: NetworkAddress, - // we're keeping this as separate resolved field since we do not want to be resolving the potential - // hostname every time we want to construct a path via this node - pub mix_host: SocketAddr, + // we're keeping all resolved IPs as a separate field since we do not want to be resolving the potential + // hostname every time we want to construct a path via this node. When we need one, we default to the first one + pub mix_hosts: Vec, pub identity_key: identity::PublicKey, pub sphinx_key: encryption::PublicKey, // TODO: or nymsphinx::PublicKey? both are x25519 pub layer: Layer, @@ -49,7 +49,7 @@ impl std::fmt::Debug for Node { .field("mix_id", &self.mix_id) .field("owner", &self.owner) .field("host", &self.host) - .field("mix_host", &self.mix_host) + .field("mix_hosts", &self.mix_hosts) .field("identity_key", &self.identity_key.to_base58_string()) .field("sphinx_key", &self.sphinx_key.to_base58_string()) .field("layer", &self.layer) @@ -70,13 +70,12 @@ impl Node { pub fn extract_mix_host( host: &NetworkAddress, mix_port: u16, - ) -> Result { - Ok(host.to_socket_addrs(mix_port).map_err(|err| { - MixnodeConversionError::InvalidAddress { + ) -> Result, MixnodeConversionError> { + host.to_socket_addrs(mix_port) + .map_err(|err| MixnodeConversionError::InvalidAddress { value: host.to_string(), source: err, - } - })?[0]) + }) } } @@ -89,7 +88,7 @@ impl filter::Versioned for Node { impl<'a> From<&'a Node> for SphinxNode { fn from(node: &'a Node) -> Self { - let node_address_bytes = NymNodeRoutingAddress::from(node.mix_host) + let node_address_bytes = NymNodeRoutingAddress::from(node.mix_hosts[0]) .try_into() .unwrap(); @@ -105,13 +104,13 @@ impl<'a> TryFrom<&'a MixNodeBond> for Node { // try to completely resolve the host in the mix situation to avoid doing it every // single time we want to construct a path - let mix_host = Self::extract_mix_host(&host, bond.mix_node.mix_port)?; + let mix_hosts = Self::extract_mix_host(&host, bond.mix_node.mix_port)?; Ok(Node { mix_id: bond.mix_id, owner: bond.owner.as_str().to_owned(), host, - mix_host, + mix_hosts, identity_key: identity::PublicKey::from_base58_string(&bond.mix_node.identity_key)?, sphinx_key: encryption::PublicKey::from_base58_string(&bond.mix_node.sphinx_key)?, layer: bond.layer, diff --git a/common/topology/src/serde.rs b/common/topology/src/serde.rs index 84770a48683..7d777bc6b6d 100644 --- a/common/topology/src/serde.rs +++ b/common/topology/src/serde.rs @@ -4,6 +4,7 @@ use crate::gateway::GatewayConversionError; use crate::mix::MixnodeConversionError; use crate::{gateway, mix, MixLayer, NymTopology}; +use nym_api_requests::models::DescribedNymNode; use nym_config::defaults::{DEFAULT_CLIENT_LISTENING_PORT, DEFAULT_MIX_LISTENING_PORT}; use nym_crypto::asymmetric::{encryption, identity}; use serde::{Deserialize, Serialize}; @@ -53,6 +54,12 @@ impl From for JsValue { pub struct SerializableNymTopology { pub mixnodes: BTreeMap>, pub gateways: Vec, + + //SW NOTE : make this an option to keep backwards compatibility. Noise with fallback needs that to work though + // Once fallback is removed, we only need a list of unfiltered nodes that can be constructed from mixnodes and gateways + // depending on the usecase of this struct + #[serde(alias = "described_nodes")] + pub described_nodes: Option>, //DescribedNymNode is already Serialize and Deserialize } impl TryFrom for NymTopology { @@ -76,7 +83,11 @@ impl TryFrom for NymTopology { .map(TryInto::try_into) .collect::>()?; - Ok(NymTopology::new(converted_mixes, gateways)) + Ok(NymTopology::new( + converted_mixes, + gateways, + value.described_nodes.unwrap_or_default(), + )) } } @@ -89,6 +100,7 @@ impl From for SerializableNymTopology { .map(|(&l, nodes)| (l, nodes.iter().map(Into::into).collect())) .collect(), gateways: value.gateways().iter().map(Into::into).collect(), + described_nodes: Some(value.described_nodes), } } } @@ -135,13 +147,13 @@ impl TryFrom for mix::Node { // try to completely resolve the host in the mix situation to avoid doing it every // single time we want to construct a path - let mix_host = mix::Node::extract_mix_host(&host, mix_port)?; + let mix_hosts = mix::Node::extract_mix_host(&host, mix_port)?; Ok(mix::Node { mix_id: value.mix_id, owner: value.owner, host, - mix_host, + mix_hosts, identity_key: identity::PublicKey::from_base58_string(&value.identity_key) .map_err(MixnodeConversionError::from)?, sphinx_key: encryption::PublicKey::from_base58_string(&value.sphinx_key) @@ -159,7 +171,7 @@ impl<'a> From<&'a mix::Node> for SerializableMixNode { mix_id: value.mix_id, owner: value.owner.clone(), host: value.host.to_string(), - mix_port: Some(value.mix_host.port()), + mix_port: Some(value.mix_hosts[0].port()), identity_key: value.identity_key.to_base58_string(), sphinx_key: value.sphinx_key.to_base58_string(), layer: value.layer.into(), @@ -181,8 +193,8 @@ pub struct SerializableGateway { // optional ip address in the case of host being a hostname that can't be resolved // (thank you wasm) #[cfg_attr(feature = "wasm-serde-types", tsify(optional))] - #[serde(alias = "explicit_ip")] - pub explicit_ip: Option, + #[serde(alias = "explicit_ips")] + pub explicit_ips: Option>, #[cfg_attr(feature = "wasm-serde-types", tsify(optional))] #[serde(alias = "mix_port")] @@ -221,8 +233,11 @@ impl TryFrom for gateway::Node { // try to completely resolve the host in the mix situation to avoid doing it every // single time we want to construct a path - let mix_host = if let Some(explicit_ip) = value.explicit_ip { - SocketAddr::new(explicit_ip, mix_port) + let mix_hosts = if let Some(explicit_ips) = value.explicit_ips { + explicit_ips + .iter() + .map(|explicit_ip| SocketAddr::new(*explicit_ip, mix_port)) + .collect() } else { gateway::Node::extract_mix_host(&host, mix_port)? }; @@ -230,7 +245,7 @@ impl TryFrom for gateway::Node { Ok(gateway::Node { owner: value.owner, host, - mix_host, + mix_hosts, clients_ws_port, clients_wss_port: value.clients_wss_port, identity_key: identity::PublicKey::from_base58_string(&value.identity_key) @@ -247,8 +262,8 @@ impl<'a> From<&'a gateway::Node> for SerializableGateway { SerializableGateway { owner: value.owner.clone(), host: value.host.to_string(), - explicit_ip: Some(value.mix_host.ip()), - mix_port: Some(value.mix_host.port()), + explicit_ips: Some(value.mix_hosts.iter().map(|addr| addr.ip()).collect()), + mix_port: Some(value.mix_hosts[0].port()), clients_ws_port: Some(value.clients_ws_port), clients_wss_port: value.clients_wss_port, identity_key: value.identity_key.to_base58_string(), diff --git a/gateway/Cargo.toml b/gateway/Cargo.toml index 90ef9195ca3..bb1ed87928b 100644 --- a/gateway/Cargo.toml +++ b/gateway/Cargo.toml @@ -78,6 +78,9 @@ nym-task = { path = "../common/task" } nym-types = { path = "../common/types" } nym-validator-client = { path = "../common/client-libs/validator-client" } nym-ip-packet-router = { path = "../service-providers/ip-packet-router" } +nym-topology-control = { path = "../common/topology-control" } +nym-topology = { path = "../common/topology" } +nym-noise = { path = "../common/nymnoise" } nym-wireguard = { path = "../common/wireguard", optional = true } defguard_wireguard_rs = { git = "https://github.com/neacsu/wireguard-rs.git", rev = "c2cd0c1119f699f4bc43f5e6ffd6fc242caa42ed", optional = true } diff --git a/gateway/src/commands/run.rs b/gateway/src/commands/run.rs index 76f7629de5d..244b637caca 100644 --- a/gateway/src/commands/run.rs +++ b/gateway/src/commands/run.rs @@ -58,6 +58,10 @@ pub struct Run { // the alias here is included for backwards compatibility (1.1.4 and before) nym_apis: Option>, + /// Path to .json file containing custom network specification. + #[arg(long, group = "network", hide = true)] + custom_mixnet: Option, + /// Comma separated list of endpoints of the validator #[arg( long, @@ -127,11 +131,6 @@ pub struct Run { )] medium_toggle: bool, - /// Path to .json file containing custom network specification. - /// Only usable when local network requester is enabled. - #[arg(long, group = "network", hide = true)] - custom_mixnet: Option, - /// Specifies whether this network requester will run using the default ExitPolicy /// as opposed to the allow list. /// Note: this setting will become the default in the future releases. @@ -249,12 +248,17 @@ pub async fn execute(args: Run) -> anyhow::Result<()> { } let node_details = node_details(&config)?; - let gateway = - crate::node::create_gateway(config, Some(nr_opts), Some(ip_opts), custom_mixnet).await?; + let mut gateway = + crate::node::create_gateway(config, Some(nr_opts), Some(ip_opts), custom_mixnet.clone()) + .await?; eprintln!( "\nTo bond your gateway you will need to install the Nym wallet, go to https://nymtech.net/get-involved and select the Download button.\n\ Select the correct version and install it to your machine. You will need to provide some of the following: \n "); output.to_stdout(&node_details); + if let Some(custom_mixnet) = custom_mixnet { + gateway = gateway.with_stored_topology(custom_mixnet)?; + } + gateway.run().await } diff --git a/gateway/src/config/mod.rs b/gateway/src/config/mod.rs index b872a1dffd0..8d5a6ee5d96 100644 --- a/gateway/src/config/mod.rs +++ b/gateway/src/config/mod.rs @@ -38,6 +38,9 @@ const DEFAULT_PACKET_FORWARDING_MAXIMUM_BACKOFF: Duration = Duration::from_milli const DEFAULT_INITIAL_CONNECTION_TIMEOUT: Duration = Duration::from_millis(1_500); const DEFAULT_MAXIMUM_CONNECTION_BUFFER_SIZE: usize = 2000; +const DEFAULT_TOPOLOGY_REFRESH_RATE: Duration = Duration::from_secs(5 * 60); // every 5min +const DEFAULT_TOPOLOGY_RESOLUTION_TIMEOUT: Duration = Duration::from_millis(5_000); + const DEFAULT_STORED_MESSAGE_FILENAME_LENGTH: u16 = 16; const DEFAULT_MESSAGE_RETRIEVAL_LIMIT: i64 = 100; @@ -109,6 +112,9 @@ pub struct Config { #[serde(default)] pub debug: Debug, + + #[serde(default)] + pub topology: Topology, } impl NymConfigTemplate for Config { @@ -135,6 +141,7 @@ impl Config { ip_packet_router: Default::default(), logging: Default::default(), debug: Default::default(), + topology: Default::default(), } } @@ -442,3 +449,33 @@ impl Default for Debug { } } } + +#[derive(Debug, Clone, Copy, Deserialize, PartialEq, Serialize)] +#[serde(default, deny_unknown_fields)] +pub struct Topology { + /// The uniform delay every which clients are querying the directory server + /// to try to obtain a compatible network topology to send sphinx packets through. + #[serde(with = "humantime_serde")] + pub topology_refresh_rate: Duration, + + /// During topology refresh, test packets are sent through every single possible network + /// path. This timeout determines waiting period until it is decided that the packet + /// did not reach its destination. + #[serde(with = "humantime_serde")] + pub topology_resolution_timeout: Duration, + + /// Specifies whether the client should not refresh the network topology after obtaining + /// the first valid instance. + /// Supersedes `topology_refresh_rate_ms`. + pub disable_refreshing: bool, +} + +impl Default for Topology { + fn default() -> Self { + Topology { + topology_refresh_rate: DEFAULT_TOPOLOGY_REFRESH_RATE, + topology_resolution_timeout: DEFAULT_TOPOLOGY_RESOLUTION_TIMEOUT, + disable_refreshing: false, + } + } +} diff --git a/gateway/src/config/old_config_v1_1_31.rs b/gateway/src/config/old_config_v1_1_31.rs index 18841a49936..0b86b216188 100644 --- a/gateway/src/config/old_config_v1_1_31.rs +++ b/gateway/src/config/old_config_v1_1_31.rs @@ -165,6 +165,9 @@ impl From for Config { message_retrieval_limit: value.debug.message_retrieval_limit, use_legacy_framed_packet_version: value.debug.use_legacy_framed_packet_version, }, + // \/ ADDED + topology: Default::default(), + // /\ ADDED } } } diff --git a/gateway/src/error.rs b/gateway/src/error.rs index 61f75981efe..6e0b1e44564 100644 --- a/gateway/src/error.rs +++ b/gateway/src/error.rs @@ -39,6 +39,15 @@ pub(crate) enum GatewayError { source: io::Error, }, + #[error( + "failed to load custom topology using path '{}'. detailed message: {source}", file_path.display() + )] + CustomTopologyLoadFailure { + file_path: PathBuf, + #[source] + source: std::io::Error, + }, + #[error( "failed to load config file for network requester (gateway-id: '{id}') using path '{}'. detailed message: {source}", path.display() )] diff --git a/gateway/src/http/mod.rs b/gateway/src/http/mod.rs index 7b63a522fbc..4c68e55ece8 100644 --- a/gateway/src/http/mod.rs +++ b/gateway/src/http/mod.rs @@ -12,6 +12,7 @@ use nym_network_requester::RequestFilter; use nym_node::error::NymNodeError; use nym_node::http::api::api_requests; use nym_node::http::api::api_requests::v1::network_requester::exit_policy::models::UsedExitPolicy; +use nym_node::http::api::api_requests::v1::node::models::NoiseInformation; use nym_node::http::api::api_requests::SignedHostInformation; use nym_node::http::router::WireguardAppState; use nym_node::wireguard::types::GatewayClientRegistry; @@ -260,6 +261,7 @@ impl<'a> HttpApiBuilder<'a> { self.sphinx_keypair.public_key(), self.identity_keypair, )?, + NoiseInformation { supported: true }, //Now we can enable that ) .with_gateway(load_gateway_details(self.gateway_config)?) .with_landing_page_assets(self.gateway_config.http.landing_page_assets_path.as_ref()); diff --git a/gateway/src/node/mixnet_handling/receiver/connection_handler.rs b/gateway/src/node/mixnet_handling/receiver/connection_handler.rs index d07dc6489eb..426fa63d623 100644 --- a/gateway/src/node/mixnet_handling/receiver/connection_handler.rs +++ b/gateway/src/node/mixnet_handling/receiver/connection_handler.rs @@ -9,15 +9,20 @@ use crate::node::storage::Storage; use futures::channel::mpsc::SendError; use futures::StreamExt; use log::*; +use nym_crypto::asymmetric::encryption; use nym_mixnet_client::forwarder::MixForwardingSender; use nym_mixnode_common::packet_processor::processor::ProcessedFinalHop; +use nym_noise::upgrade_noise_responder_with_topology; use nym_sphinx::forwarding::packet::MixPacket; use nym_sphinx::framing::codec::NymCodec; use nym_sphinx::framing::packet::FramedNymPacket; use nym_sphinx::DestinationAddressBytes; use nym_task::TaskClient; +use nym_topology_control::accessor::TopologyAccessor; +use nym_validator_client::NymApiClient; use std::collections::HashMap; use std::net::SocketAddr; +use std::sync::Arc; use thiserror::Error; use tokio::net::TcpStream; use tokio_util::codec::Framed; @@ -40,6 +45,9 @@ pub(crate) struct ConnectionHandler { active_clients_store: ActiveClientsStore, storage: St, ack_sender: MixForwardingSender, + topology_access: TopologyAccessor, + api_client: NymApiClient, + local_identity: Arc, } impl Clone for ConnectionHandler { @@ -58,6 +66,9 @@ impl Clone for ConnectionHandler { active_clients_store: self.active_clients_store.clone(), storage: self.storage.clone(), ack_sender: self.ack_sender.clone(), + topology_access: self.topology_access.clone(), + api_client: self.api_client.clone(), + local_identity: self.local_identity.clone(), } } } @@ -68,6 +79,9 @@ impl ConnectionHandler { storage: St, ack_sender: MixForwardingSender, active_clients_store: ActiveClientsStore, + topology_access: TopologyAccessor, + api_client: NymApiClient, + local_identity: Arc, ) -> Self { ConnectionHandler { packet_processor, @@ -75,6 +89,9 @@ impl ConnectionHandler { storage, active_clients_store, ack_sender, + topology_access, + api_client, + local_identity, } } @@ -205,7 +222,38 @@ impl ConnectionHandler { ) { debug!("Starting connection handler for {:?}", remote); shutdown.mark_as_success(); - let mut framed_conn = Framed::new(conn, NymCodec); + + let Some(topology) = self.topology_access.current_topology().await else { + error!("Cannot perform Noise handshake to {remote}, due to topology error"); + return; + }; + + let epoch_id = match self.api_client.get_current_epoch_id().await { + Ok(id) => id, + Err(err) => { + error!("Cannot perform Noise handshake to {remote}, due to epoch id error - {err}"); + return; + } + }; + + let noise_stream = match upgrade_noise_responder_with_topology( + conn, + Default::default(), + &topology, + epoch_id, + self.local_identity.public_key(), + self.local_identity.private_key(), + ) + .await + { + Ok(noise_stream) => noise_stream, + Err(err) => { + error!("Failed to perform Noise handshake with {remote} - {err}"); + return; + } + }; + debug!("Noise responder handshake completed for {:?}", remote); + let mut framed_conn = Framed::new(noise_stream, NymCodec); while !shutdown.is_shutdown() { tokio::select! { biased; diff --git a/gateway/src/node/mod.rs b/gateway/src/node/mod.rs index 9fa6d1af8e6..62565af6edb 100644 --- a/gateway/src/node/mod.rs +++ b/gateway/src/node/mod.rs @@ -7,7 +7,7 @@ use crate::commands::helpers::{ override_ip_packet_router_config, override_network_requester_config, OverrideIpPacketRouterConfig, OverrideNetworkRequesterConfig, }; -use crate::config::Config; +use crate::config::{Config, Topology}; use crate::error::GatewayError; use crate::http::HttpApiBuilder; use crate::node::client_handling::active_clients::ActiveClientsStore; @@ -31,13 +31,21 @@ use nym_network_requester::{LocalGateway, NRServiceProviderBuilder, RequestFilte use nym_node::wireguard::types::GatewayClientRegistry; use nym_statistics_common::collector::StatisticsSender; use nym_task::{TaskClient, TaskManager}; +use nym_topology::provider_trait::TopologyProvider; +use nym_topology::HardcodedTopologyProvider; +use nym_topology_control::accessor::TopologyAccessor; +use nym_topology_control::nym_api_provider::NymApiTopologyProvider; +use nym_topology_control::TopologyRefresher; +use nym_topology_control::TopologyRefresherConfig; +use nym_validator_client::NymApiClient; use nym_validator_client::{nyxd, DirectSigningHttpRpcNyxdClient}; use rand::seq::SliceRandom; use rand::thread_rng; use std::error::Error; use std::net::SocketAddr; -use std::path::PathBuf; +use std::path::{Path, PathBuf}; use std::sync::Arc; +use url::Url; pub(crate) mod client_handling; pub(crate) mod helpers; @@ -131,6 +139,7 @@ pub(crate) struct Gateway { storage: St, client_registry: Arc, + custom_topology_provider: Option>, } impl Gateway { @@ -149,6 +158,7 @@ impl Gateway { network_requester_opts, ip_packet_router_opts, client_registry: Arc::new(DashMap::new()), + custom_topology_provider: None, }) } @@ -169,13 +179,28 @@ impl Gateway { sphinx_keypair: Arc::new(sphinx_keypair), storage, client_registry: Arc::new(DashMap::new()), + custom_topology_provider: None, } } + pub fn with_stored_topology>(mut self, file: P) -> Result { + self.custom_topology_provider = Some(Box::new( + HardcodedTopologyProvider::new_from_file(&file).map_err(|source| { + GatewayError::CustomTopologyLoadFailure { + file_path: file.as_ref().to_path_buf(), + source, + } + })?, + )); + Ok(self) + } + fn start_mix_socket_listener( &self, ack_sender: MixForwardingSender, active_clients_store: ActiveClientsStore, + topology_access: TopologyAccessor, + api_client: NymApiClient, shutdown: TaskClient, ) where St: Storage + Clone + 'static, @@ -190,6 +215,9 @@ impl Gateway { self.storage.clone(), ack_sender, active_clients_store, + topology_access, + api_client, + Arc::clone(&self.sphinx_keypair), ); let listening_address = SocketAddr::new( @@ -243,15 +271,26 @@ impl Gateway { ); } - fn start_packet_forwarder(&self, shutdown: TaskClient) -> MixForwardingSender { + fn start_packet_forwarder( + &self, + topology_access: TopologyAccessor, + api_client: NymApiClient, + shutdown: TaskClient, + ) -> MixForwardingSender { info!("Starting mix packet forwarder..."); - let (mut packet_forwarder, packet_sender) = PacketForwarder::new( + let forwarder_config = nym_mixnet_client::client::Config::new( self.config.debug.packet_forwarding_initial_backoff, self.config.debug.packet_forwarding_maximum_backoff, self.config.debug.initial_connection_timeout, self.config.debug.maximum_connection_buffer_size, self.config.debug.use_legacy_framed_packet_version, + ); + let (mut packet_forwarder, packet_sender) = PacketForwarder::new( + forwarder_config, + topology_access, + api_client, + Arc::clone(&self.sphinx_keypair), shutdown, ); @@ -425,6 +464,50 @@ impl Gateway { .map_err(Into::into) } + fn setup_topology_provider( + custom_provider: Option>, + nym_api_urls: Vec, + ) -> Box { + // if no custom provider was ... provided ..., create one using nym-api + custom_provider.unwrap_or(Box::new(NymApiTopologyProvider::new( + nym_api_urls, + env!("CARGO_PKG_VERSION").to_string(), + ))) + } + + // future responsible for periodically polling directory server and updating + // the current global view of topology + async fn start_topology_refresher( + topology_provider: Box, + topology_config: Topology, + topology_accessor: TopologyAccessor, + mut shutdown: TaskClient, + ) { + let topology_refresher_config = + TopologyRefresherConfig::new(topology_config.topology_refresh_rate); + + let mut topology_refresher = TopologyRefresher::new( + topology_refresher_config, + topology_accessor, + topology_provider, + ); + // before returning, block entire runtime to refresh the current network view so that any + // components depending on topology would see a non-empty view + info!("Obtaining initial network topology"); + topology_refresher.try_refresh().await; + + if topology_config.disable_refreshing { + // if we're not spawning the refresher, don't cause shutdown immediately + info!("The topology refesher is not going to be started"); + shutdown.mark_as_success(); + } else { + // don't spawn the refresher if we don't want to be refreshing the topology. + // only use the initial values obtained + info!("Starting topology refresher..."); + topology_refresher.start_with_shutdown(shutdown); + } + } + async fn check_if_bonded(&self) -> Result { // TODO: if anything, this should be getting data directly from the contract // as opposed to the validator API @@ -442,7 +525,7 @@ impl Gateway { })) } - pub async fn run(self) -> anyhow::Result<()> + pub async fn run(mut self) -> anyhow::Result<()> where St: Storage + Clone + 'static, { @@ -459,13 +542,35 @@ impl Gateway { CoconutVerifier::new(nyxd_client).await }?; - let mix_forwarding_channel = - self.start_packet_forwarder(shutdown.subscribe().named("PacketForwarder")); + let topology_provider = Self::setup_topology_provider( + self.custom_topology_provider.take(), + self.config.get_nym_api_endpoints(), + ); + + let shared_topology_access = TopologyAccessor::new(); + + Self::start_topology_refresher( + topology_provider, + self.config.topology, + shared_topology_access.clone(), + shutdown.subscribe(), + ) + .await; + + let random_api_client = self.random_api_client()?; + + let mix_forwarding_channel = self.start_packet_forwarder( + shared_topology_access.clone(), + random_api_client.clone(), + shutdown.subscribe().named("PacketForwarder"), + ); let active_clients_store = ActiveClientsStore::new(); self.start_mix_socket_listener( mix_forwarding_channel.clone(), active_clients_store.clone(), + shared_topology_access, + random_api_client, shutdown.subscribe().named("mixnet_handling::Listener"), ); diff --git a/mixnode/Cargo.toml b/mixnode/Cargo.toml index 589feffebca..f3c21396d7d 100644 --- a/mixnode/Cargo.toml +++ b/mixnode/Cargo.toml @@ -58,9 +58,11 @@ nym-pemstore = { path = "../common/pemstore", version = "0.3.0" } nym-task = { path = "../common/task" } nym-types = { path = "../common/types" } nym-topology = { path = "../common/topology" } +nym-topology-control = { path = "../common/topology-control" } nym-validator-client = { path = "../common/client-libs/validator-client" } nym-bin-common = { path = "../common/bin-common", features = ["output_format"] } cpu-cycles = { path = "../cpu-cycles", optional = true } +nym-noise = { path = "../common/nymnoise" } [dev-dependencies] tokio = { workspace = true, features = [ diff --git a/mixnode/src/commands/run.rs b/mixnode/src/commands/run.rs index c4ab5125fbb..bf51dc17782 100644 --- a/mixnode/src/commands/run.rs +++ b/mixnode/src/commands/run.rs @@ -11,6 +11,7 @@ use nym_bin_common::output_format::OutputFormat; use nym_config::helpers::SPECIAL_ADDRESSES; use nym_validator_client::nyxd; use std::net::IpAddr; +use std::path::PathBuf; #[derive(Args, Clone)] pub(crate) struct Run { /// Id of the nym-mixnode we want to run @@ -42,6 +43,10 @@ pub(crate) struct Run { #[clap(long, alias = "validators", value_delimiter = ',')] nym_apis: Option>, + /// Path to .json file containing custom network specification. + #[arg(long, group = "network", hide = true)] + pub custom_mixnet: Option, + #[clap(short, long, default_value_t = OutputFormat::default())] output: OutputFormat, } @@ -91,6 +96,10 @@ pub(crate) async fn execute(args: &Run) -> anyhow::Result<()> { Select the correct version and install it to your machine. You will need to provide the following: \n "); mixnode.print_node_details(args.output); + if let Some(custom_mixnet) = &args.custom_mixnet { + mixnode = mixnode.with_stored_topology(custom_mixnet)?; + } + mixnode.run().await?; Ok(()) } diff --git a/mixnode/src/config/mod.rs b/mixnode/src/config/mod.rs index a1ab25a752c..49b4e3ddfc7 100644 --- a/mixnode/src/config/mod.rs +++ b/mixnode/src/config/mod.rs @@ -47,6 +47,9 @@ const DEFAULT_PACKET_FORWARDING_MAXIMUM_BACKOFF: Duration = Duration::from_milli const DEFAULT_INITIAL_CONNECTION_TIMEOUT: Duration = Duration::from_millis(1_500); const DEFAULT_MAXIMUM_CONNECTION_BUFFER_SIZE: usize = 2000; +const DEFAULT_TOPOLOGY_REFRESH_RATE: Duration = Duration::from_secs(5 * 60); // every 5min +const DEFAULT_TOPOLOGY_RESOLUTION_TIMEOUT: Duration = Duration::from_millis(5_000); + /// Derive default path to mixnodes's config directory. /// It should get resolved to `$HOME/.nym/mixnodes//config` pub fn default_config_directory>(id: P) -> PathBuf { @@ -107,6 +110,9 @@ pub struct Config { #[serde(default)] pub debug: Debug, + + #[serde(default)] + pub topology: Topology, } impl NymConfigTemplate for Config { @@ -132,6 +138,7 @@ impl Config { verloc: Default::default(), logging: Default::default(), debug: Default::default(), + topology: Default::default(), } } @@ -339,3 +346,33 @@ impl Default for Debug { } } } + +#[derive(Debug, Clone, Copy, Deserialize, PartialEq, Serialize)] +#[serde(default, deny_unknown_fields)] +pub struct Topology { + /// The uniform delay every which clients are querying the directory server + /// to try to obtain a compatible network topology to send sphinx packets through. + #[serde(with = "humantime_serde")] + pub topology_refresh_rate: Duration, + + /// During topology refresh, test packets are sent through every single possible network + /// path. This timeout determines waiting period until it is decided that the packet + /// did not reach its destination. + #[serde(with = "humantime_serde")] + pub topology_resolution_timeout: Duration, + + /// Specifies whether the client should not refresh the network topology after obtaining + /// the first valid instance. + /// Supersedes `topology_refresh_rate_ms`. + pub disable_refreshing: bool, +} + +impl Default for Topology { + fn default() -> Self { + Topology { + topology_refresh_rate: DEFAULT_TOPOLOGY_REFRESH_RATE, + topology_resolution_timeout: DEFAULT_TOPOLOGY_RESOLUTION_TIMEOUT, + disable_refreshing: false, + } + } +} diff --git a/mixnode/src/config/old_config_v1_1_32.rs b/mixnode/src/config/old_config_v1_1_32.rs index a17cdeadcdc..0170aa766d4 100644 --- a/mixnode/src/config/old_config_v1_1_32.rs +++ b/mixnode/src/config/old_config_v1_1_32.rs @@ -109,6 +109,9 @@ impl From for Config { verloc: value.verloc.into(), logging: value.logging, debug: value.debug.into(), + // \/ ADDED + topology: Default::default(), + // /\ ADDED } } } diff --git a/mixnode/src/error.rs b/mixnode/src/error.rs index 57c45947478..34c57b1f0a4 100644 --- a/mixnode/src/error.rs +++ b/mixnode/src/error.rs @@ -34,6 +34,15 @@ pub enum MixnodeError { source: io::Error, }, + #[error( + "failed to load custom topology using path '{}'. detailed message: {source}", file_path.display() + )] + CustomTopologyLoadFailure { + file_path: PathBuf, + #[source] + source: std::io::Error, + }, + #[error( "failed to save config file for id {id} using path '{}'. detailed message: {source}", path.display() )] diff --git a/mixnode/src/node/http/mod.rs b/mixnode/src/node/http/mod.rs index 5147664c702..bc43a90c6be 100644 --- a/mixnode/src/node/http/mod.rs +++ b/mixnode/src/node/http/mod.rs @@ -11,6 +11,7 @@ use nym_bin_common::bin_info_owned; use nym_crypto::asymmetric::{encryption, identity}; use nym_node::error::NymNodeError; use nym_node::http::api::api_requests; +use nym_node::http::api::api_requests::v1::node::models::NoiseInformation; use nym_node::http::api::api_requests::SignedHostInformation; use nym_task::TaskClient; @@ -93,6 +94,7 @@ impl<'a> HttpApiBuilder<'a> { self.sphinx_keypair.public_key(), self.identity_keypair, )?, + NoiseInformation { supported: true }, //Now we can enable that ) .with_mixnode(load_mixnode_details(self.mixnode_config)?) .with_landing_page_assets(self.mixnode_config.http.landing_page_assets_path.as_ref()); diff --git a/mixnode/src/node/listener/connection_handler/mod.rs b/mixnode/src/node/listener/connection_handler/mod.rs index c16a700bfd8..a1c671efe60 100644 --- a/mixnode/src/node/listener/connection_handler/mod.rs +++ b/mixnode/src/node/listener/connection_handler/mod.rs @@ -9,12 +9,17 @@ use crate::node::TaskClient; use futures::StreamExt; use log::debug; use log::{error, info, warn}; +use nym_crypto::asymmetric::encryption; use nym_mixnode_common::measure; +use nym_noise::upgrade_noise_responder_with_topology; use nym_sphinx::forwarding::packet::MixPacket; use nym_sphinx::framing::codec::NymCodec; use nym_sphinx::framing::packet::FramedNymPacket; use nym_sphinx::Delay as SphinxDelay; +use nym_topology_control::accessor::TopologyAccessor; +use nym_validator_client::NymApiClient; use std::net::SocketAddr; +use std::sync::Arc; use tokio::net::TcpStream; use tokio::time::Instant; use tokio_util::codec::Framed; @@ -28,16 +33,25 @@ pub(crate) mod packet_processing; pub(crate) struct ConnectionHandler { packet_processor: PacketProcessor, delay_forwarding_channel: PacketDelayForwardSender, + topology_access: TopologyAccessor, + api_client: NymApiClient, + local_identity: Arc, } impl ConnectionHandler { pub(crate) fn new( packet_processor: PacketProcessor, delay_forwarding_channel: PacketDelayForwardSender, + topology_access: TopologyAccessor, + api_client: NymApiClient, + local_identity: Arc, ) -> Self { ConnectionHandler { packet_processor, delay_forwarding_channel, + topology_access, + api_client, + local_identity, } } @@ -88,8 +102,40 @@ impl ConnectionHandler { mut shutdown: TaskClient, ) { debug!("Starting connection handler for {:?}", remote); + shutdown.mark_as_success(); - let mut framed_conn = Framed::new(conn, NymCodec); + + let Some(topology) = self.topology_access.current_topology().await else { + error!("Cannot perform Noise handshake to {remote}, due to topology error"); + return; + }; + + let epoch_id = match self.api_client.get_current_epoch_id().await { + Ok(id) => id, + Err(err) => { + error!("Cannot perform Noise handshake to {remote}, due to epoch id error - {err}"); + return; + } + }; + + let noise_stream = match upgrade_noise_responder_with_topology( + conn, + Default::default(), + &topology, + epoch_id, + self.local_identity.public_key(), + self.local_identity.private_key(), + ) + .await + { + Ok(noise_stream) => noise_stream, + Err(err) => { + error!("Failed to perform Noise handshake with {remote} - {err}"); + return; + } + }; + debug!("Noise responder handshake completed for {:?}", remote); + let mut framed_conn = Framed::new(noise_stream, NymCodec); while !shutdown.is_shutdown() { tokio::select! { biased; @@ -121,10 +167,7 @@ impl ConnectionHandler { } } - info!( - "Closing connection from {:?}", - framed_conn.into_inner().peer_addr() - ); + info!("Closing connection from {:?}", remote); log::trace!("ConnectionHandler: Exiting"); } } diff --git a/mixnode/src/node/mod.rs b/mixnode/src/node/mod.rs index 1667b108fd6..587e1c61c2b 100644 --- a/mixnode/src/node/mod.rs +++ b/mixnode/src/node/mod.rs @@ -1,7 +1,7 @@ // Copyright 2020-2023 - Nym Technologies SA // SPDX-License-Identifier: GPL-3.0-only -use crate::config::Config; +use crate::config::{Config, Topology}; use crate::error::MixnodeError; use crate::node::helpers::{load_identity_keys, load_sphinx_keys}; use crate::node::http::legacy::verloc::VerlocState; @@ -18,11 +18,20 @@ use nym_bin_common::version_checker::parse_version; use nym_crypto::asymmetric::{encryption, identity}; use nym_mixnode_common::verloc::{self, AtomicVerlocResult, VerlocMeasurer}; use nym_task::{TaskClient, TaskManager}; +use nym_topology::provider_trait::TopologyProvider; +use nym_topology::HardcodedTopologyProvider; +use nym_topology_control::accessor::TopologyAccessor; +use nym_topology_control::nym_api_provider::NymApiTopologyProvider; +use nym_topology_control::TopologyRefresher; +use nym_topology_control::TopologyRefresherConfig; +use nym_validator_client::NymApiClient; use rand::seq::SliceRandom; use rand::thread_rng; use std::net::SocketAddr; +use std::path::Path; use std::process; use std::sync::Arc; +use url::Url; pub(crate) mod helpers; mod http; @@ -37,6 +46,7 @@ pub struct MixNode { descriptor: NodeDescription, identity_keypair: Arc, sphinx_keypair: Arc, + custom_topology_provider: Option>, } impl MixNode { @@ -46,6 +56,7 @@ impl MixNode { identity_keypair: Arc::new(load_identity_keys(&config)?), sphinx_keypair: Arc::new(load_sphinx_keys(&config)?), config, + custom_topology_provider: None, }) } @@ -68,6 +79,18 @@ impl MixNode { println!("{}", output.format(&node_details)); } + pub fn with_stored_topology>(mut self, file: P) -> Result { + self.custom_topology_provider = Some(Box::new( + HardcodedTopologyProvider::new_from_file(&file).map_err(|source| { + MixnodeError::CustomTopologyLoadFailure { + file_path: file.as_ref().to_path_buf(), + source, + } + })?, + )); + Ok(self) + } + fn start_http_api( &self, atomic_verloc_result: AtomicVerlocResult, @@ -101,6 +124,8 @@ impl MixNode { &self, node_stats_update_sender: node_statistics::UpdateSender, delay_forwarding_channel: PacketDelayForwardSender, + topology_access: TopologyAccessor, + api_client: NymApiClient, shutdown: TaskClient, ) { info!("Starting socket listener..."); @@ -108,7 +133,13 @@ impl MixNode { let packet_processor = PacketProcessor::new(self.sphinx_keypair.private_key(), node_stats_update_sender); - let connection_handler = ConnectionHandler::new(packet_processor, delay_forwarding_channel); + let connection_handler = ConnectionHandler::new( + packet_processor, + delay_forwarding_channel, + topology_access, + api_client, + Arc::clone(&self.sphinx_keypair), + ); let listening_address = SocketAddr::new( self.config.mixnode.listening_address, @@ -121,6 +152,8 @@ impl MixNode { fn start_packet_delay_forwarder( &mut self, node_stats_update_sender: node_statistics::UpdateSender, + topology_access: TopologyAccessor, + api_client: NymApiClient, shutdown: TaskClient, ) -> PacketDelayForwardSender { info!("Starting packet delay-forwarder..."); @@ -134,7 +167,12 @@ impl MixNode { ); let mut packet_forwarder = DelayForwarder::new( - nym_mixnet_client::Client::new(client_config), + nym_mixnet_client::Client::new( + client_config, + topology_access, + api_client, + Arc::clone(&self.sphinx_keypair), + ), node_stats_update_sender, shutdown, ); @@ -185,6 +223,50 @@ impl MixNode { atomic_verloc_results } + fn setup_topology_provider( + custom_provider: Option>, + nym_api_urls: Vec, + ) -> Box { + // if no custom provider was ... provided ..., create one using nym-api + custom_provider.unwrap_or(Box::new(NymApiTopologyProvider::new( + nym_api_urls, + env!("CARGO_PKG_VERSION").to_string(), + ))) + } + + // future responsible for periodically polling directory server and updating + // the current global view of topology + async fn start_topology_refresher( + topology_provider: Box, + topology_config: Topology, + topology_accessor: TopologyAccessor, + mut shutdown: TaskClient, + ) { + let topology_refresher_config = + TopologyRefresherConfig::new(topology_config.topology_refresh_rate); + + let mut topology_refresher = TopologyRefresher::new( + topology_refresher_config, + topology_accessor, + topology_provider, + ); + // before returning, block entire runtime to refresh the current network view so that any + // components depending on topology would see a non-empty view + info!("Obtaining initial network topology"); + topology_refresher.try_refresh().await; + + if topology_config.disable_refreshing { + // if we're not spawning the refresher, don't cause shutdown immediately + info!("The topology refesher is not going to be started"); + shutdown.mark_as_success(); + } else { + // don't spawn the refresher if we don't want to be refreshing the topology. + // only use the initial values obtained + info!("Starting topology refresher..."); + topology_refresher.start_with_shutdown(shutdown); + } + } + fn random_api_client(&self) -> nym_validator_client::NymApiClient { let endpoints = self.config.get_nym_api_endpoints(); let nym_api = endpoints @@ -231,13 +313,34 @@ impl MixNode { let (node_stats_pointer, node_stats_update_sender) = self .start_node_stats_controller(shutdown.subscribe().named("node_statistics::Controller")); + + let topology_provider = Self::setup_topology_provider( + self.custom_topology_provider.take(), + self.config.get_nym_api_endpoints(), + ); + let shared_topology_access = TopologyAccessor::new(); + Self::start_topology_refresher( + topology_provider, + self.config.topology, + shared_topology_access.clone(), + shutdown.subscribe().named("TopologyRefresher"), + ) + .await; + + let random_api_client = self.random_api_client(); + let delay_forwarding_channel = self.start_packet_delay_forwarder( node_stats_update_sender.clone(), + shared_topology_access.clone(), + random_api_client.clone(), shutdown.subscribe().named("DelayForwarder"), ); + self.start_socket_listener( node_stats_update_sender, delay_forwarding_channel, + shared_topology_access, + random_api_client, shutdown.subscribe().named("Listener"), ); let atomic_verloc_results = diff --git a/nym-api/nym-api-requests/src/models.rs b/nym-api/nym-api-requests/src/models.rs index 142a607e5a4..4613b6d3aa6 100644 --- a/nym-api/nym-api-requests/src/models.rs +++ b/nym-api/nym-api-requests/src/models.rs @@ -7,10 +7,12 @@ use nym_mixnet_contract_common::mixnode::MixNodeDetails; use nym_mixnet_contract_common::reward_params::{Performance, RewardingParams}; use nym_mixnet_contract_common::rewarding::RewardEstimate; use nym_mixnet_contract_common::{ - GatewayBond, IdentityKey, Interval, MixId, MixNode, Percent, RewardedSetNodeStatus, + GatewayBond, IdentityKey, Interval, MixId, MixNode, MixNodeBond, Percent, RewardedSetNodeStatus, }; use nym_node_requests::api::v1::gateway::models::WebSockets; -use nym_node_requests::api::v1::node::models::{BinaryBuildInformationOwned, HostInformation}; +use nym_node_requests::api::v1::node::models::{ + BinaryBuildInformationOwned, HostInformation, NoiseInformation, +}; use schemars::JsonSchema; use serde::{Deserialize, Serialize}; use std::fmt::{Display, Formatter}; @@ -370,7 +372,10 @@ pub struct NymNodeDescription { pub ip_packet_router: Option, // for now we only care about their ws/wss situation, nothing more - pub mixnet_websockets: WebSockets, + #[serde(default)] + pub mixnet_websockets: Option, + + pub noise_information: NoiseInformation, } #[derive(Clone, Debug, Serialize, Deserialize, schemars::JsonSchema)] @@ -388,6 +393,38 @@ impl From for DescribedGateway { } } +#[derive(Clone, Debug, Serialize, Deserialize, schemars::JsonSchema)] +pub struct DescribedMixnode { + pub bond: MixNodeBond, + pub self_described: Option, +} + +impl From for DescribedMixnode { + fn from(bond: MixNodeBond) -> Self { + DescribedMixnode { + bond, + self_described: None, + } + } +} +#[derive(Clone, Debug, Serialize, Deserialize, schemars::JsonSchema)] +pub enum DescribedNymNode { + Gateway(DescribedGateway), + Mixnode(DescribedMixnode), +} + +impl From for DescribedNymNode { + fn from(value: GatewayBond) -> Self { + Self::Gateway(value.into()) + } +} + +impl From for DescribedNymNode { + fn from(value: MixNodeDetails) -> Self { + Self::Mixnode(value.bond_information.into()) + } +} + #[derive(Clone, Debug, Serialize, Deserialize, schemars::JsonSchema)] pub struct NetworkRequesterDetails { /// address of the embedded network requester diff --git a/nym-api/src/node_describe_cache/mixnode.rs b/nym-api/src/node_describe_cache/mixnode.rs new file mode 100644 index 00000000000..8dee83b1cd4 --- /dev/null +++ b/nym-api/src/node_describe_cache/mixnode.rs @@ -0,0 +1,93 @@ +// Copyright 2023 - Nym Technologies SA +// SPDX-License-Identifier: GPL-3.0-only + +use nym_api_requests::models::NymNodeDescription; +use nym_config::defaults::DEFAULT_HTTP_API_LISTENING_PORT; +use nym_contracts_common::IdentityKey; +use nym_mixnet_contract_common::MixNode; +use nym_node_requests::api::client::NymNodeApiClientExt; + +use super::NodeDescribeCacheError; + +//this is a copy of try_get_client but for mixnode, to be deleted after smoosh probably +async fn try_get_client( + mixnode: &MixNode, +) -> Result { + let mixnode_host = &mixnode.host; + + // first try the standard port in case the operator didn't put the node behind the proxy, + // then default https (443) + // finally default http (80) + let addresses_to_try = vec![ + format!("http://{mixnode_host}:{DEFAULT_HTTP_API_LISTENING_PORT}"), + format!("https://{mixnode_host}"), + format!("http://{mixnode_host}"), + ]; + + for address in addresses_to_try { + // if provided host was malformed, no point in continuing + let client = match nym_node_requests::api::Client::new_url(address, None) { + Ok(client) => client, + Err(err) => { + return Err(NodeDescribeCacheError::MalformedHost { + host: mixnode_host.clone(), + gateway: mixnode.identity_key.clone(), + source: err, + }); + } + }; + if let Ok(health) = client.get_health().await { + if health.status.is_up() { + return Ok(client); + } + } + } + + Err(NodeDescribeCacheError::NoHttpPortsAvailable { + host: mixnode_host.clone(), + gateway: mixnode.identity_key.clone(), + }) +} + +pub(crate) async fn get_mixnode_description( + mixnode: MixNode, +) -> Result<(IdentityKey, NymNodeDescription), NodeDescribeCacheError> { + let client = try_get_client(&mixnode).await?; + + let host_info = + client + .get_host_information() + .await + .map_err(|err| NodeDescribeCacheError::ApiFailure { + gateway: mixnode.identity_key.clone(), + source: err, + })?; + + if !host_info.verify_host_information() { + return Err(NodeDescribeCacheError::MissignedHostInformation { + gateway: mixnode.identity_key, + }); + } + + let build_info = + client + .get_build_information() + .await + .map_err(|err| NodeDescribeCacheError::ApiFailure { + gateway: mixnode.identity_key.clone(), + source: err, + })?; + + let noise_info = client.get_noise_information().await.unwrap_or_default(); + + let description = NymNodeDescription { + host_information: host_info.data, + build_information: build_info, + network_requester: None, + ip_packet_router: None, + mixnet_websockets: None, + noise_information: noise_info, + }; + + Ok((mixnode.identity_key, description)) +} diff --git a/nym-api/src/node_describe_cache/mod.rs b/nym-api/src/node_describe_cache/mod.rs index ded2fa7454a..0372d0a1315 100644 --- a/nym-api/src/node_describe_cache/mod.rs +++ b/nym-api/src/node_describe_cache/mod.rs @@ -7,6 +7,7 @@ use crate::support::caching::refresher::{CacheItemProvider, CacheRefresher}; use crate::support::config; use crate::support::config::DEFAULT_NODE_DESCRIBE_BATCH_SIZE; use futures::{stream, StreamExt}; +use mixnode::get_mixnode_description; use nym_api_requests::models::{ IpPacketRouterDetails, NetworkRequesterDetails, NymNodeDescription, }; @@ -17,6 +18,8 @@ use nym_node_requests::api::client::{NymNodeApiClientError, NymNodeApiClientExt} use std::collections::HashMap; use thiserror::Error; +pub mod mixnode; + // type alias for ease of use pub type DescribedNodes = HashMap; @@ -180,12 +183,15 @@ async fn get_gateway_description( None }; + let noise_info = client.get_noise_information().await.unwrap_or_default(); + let description = NymNodeDescription { host_information: host_info.data, build_information: build_info, network_requester, ip_packet_router, - mixnet_websockets: websockets, + mixnet_websockets: Some(websockets), + noise_information: noise_info, }; Ok((gateway.identity_key, description)) @@ -202,16 +208,17 @@ impl CacheItemProvider for NodeDescriptionProvider { async fn try_refresh(&self) -> Result { let gateways = self.contract_cache.gateways_all().await; + let mixnodes = self.contract_cache.mixnodes_all().await; // let guard = self.network_gateways.get().await?; // let gateways = &*guard; - if gateways.is_empty() { + if gateways.is_empty() && mixnodes.is_empty() { return Ok(HashMap::new()); } // TODO: somehow bypass the 'higher-ranked lifetime error' and remove that redundant clone - let websockets = stream::iter( + let mut websockets = stream::iter( gateways // .deref() // .clone() @@ -232,6 +239,27 @@ impl CacheItemProvider for NodeDescriptionProvider { .collect::>() .await; + let mixnodes_websockets = stream::iter( + mixnodes + .into_iter() + .map(|detail| detail.bond_information.mix_node) + .map(get_mixnode_description), + ) + .buffer_unordered(self.batch_size) + .filter_map(|res| async move { + match res { + Ok((identity, description)) => Some((identity, description)), + Err(err) => { + debug!("{err}"); + None + } + } + }) + .collect::>() + .await; + + websockets.extend(mixnodes_websockets); + Ok(websockets) } } diff --git a/nym-api/src/nym_nodes/mod.rs b/nym-api/src/nym_nodes/mod.rs index 54e28db4213..828340484db 100644 --- a/nym-api/src/nym_nodes/mod.rs +++ b/nym-api/src/nym_nodes/mod.rs @@ -11,6 +11,7 @@ pub(crate) mod routes; /// Merges the routes with http information and returns it to Rocket for serving pub(crate) fn nym_node_routes(settings: &OpenApiSettings) -> (Vec, OpenApi) { openapi_get_routes_spec![ - settings: routes::get_gateways_described + settings: routes::get_gateways_described, + routes::get_nym_nodes_described, ] } diff --git a/nym-api/src/nym_nodes/routes.rs b/nym-api/src/nym_nodes/routes.rs index 1df02508449..88a2a051580 100644 --- a/nym-api/src/nym_nodes/routes.rs +++ b/nym-api/src/nym_nodes/routes.rs @@ -4,7 +4,7 @@ use crate::node_describe_cache::DescribedNodes; use crate::nym_contract_cache::cache::NymContractCache; use crate::support::caching::cache::SharedCache; -use nym_api_requests::models::DescribedGateway; +use nym_api_requests::models::{DescribedGateway, DescribedMixnode, DescribedNymNode}; use rocket::serde::json::Json; use rocket::State; use rocket_okapi::openapi; @@ -42,3 +42,59 @@ pub async fn get_gateways_described( .collect(), ) } + +#[openapi(tag = "Nym Nodes")] +#[get("/nym-nodes/described")] +pub async fn get_nym_nodes_described( + contract_cache: &State, + describe_cache: &State>, +) -> Json> { + let gateways = contract_cache.gateways_all().await; + let mixnodes = contract_cache.mixnodes_all().await; + if gateways.is_empty() && mixnodes.is_empty() { + return Json(Vec::new()); + } + + // if the self describe cache is unavailable, well, don't attach describe data + let Ok(self_descriptions) = describe_cache.get().await else { + return Json( + gateways + .into_iter() + .map(Into::into) + .chain(mixnodes.into_iter().map(Into::into)) + .collect(), + ); + }; + + // TODO: this is extremely inefficient, I'm merely copying existing stuff + // it shouldn't be too much of a problem until we go ahead with directory v3 / the smoosh 2: electric smoosharoo, + // but at that point (I hope) the whole caching situation should get refactored + let gateways_described: Vec = gateways + .into_iter() + .map(|bond| { + DescribedNymNode::Gateway(DescribedGateway { + self_described: self_descriptions.deref().get(bond.identity()).cloned(), + bond, + }) + }) + .collect(); + + let mixnodes_described: Vec = mixnodes + .into_iter() + .map(|detail| { + DescribedNymNode::Mixnode(DescribedMixnode { + self_described: self_descriptions + .deref() + .get(detail.bond_information.identity()) + .cloned(), + bond: detail.bond_information, + }) + }) + .collect(); + Json( + gateways_described + .into_iter() + .chain(mixnodes_described.into_iter()) + .collect(), + ) +} diff --git a/nym-connect/desktop/Cargo.lock b/nym-connect/desktop/Cargo.lock index cdd144c1754..8ebf49b432f 100644 --- a/nym-connect/desktop/Cargo.lock +++ b/nym-connect/desktop/Cargo.lock @@ -3769,6 +3769,7 @@ dependencies = [ "nym-sphinx", "nym-task", "nym-topology", + "nym-topology-control", "nym-validator-client", "rand 0.7.3", "reqwest", @@ -3872,6 +3873,7 @@ dependencies = [ "nym-sphinx", "nym-task", "nym-topology", + "nym-topology-control", "nym-validator-client", "pretty_env_logger", "rand 0.7.3", @@ -4486,6 +4488,29 @@ dependencies = [ "thiserror", ] +[[package]] +name = "nym-topology-control" +version = "0.1.0" +dependencies = [ + "async-trait", + "futures", + "gloo-timers", + "log", + "nym-explorer-client", + "nym-network-defaults", + "nym-sphinx", + "nym-task", + "nym-topology", + "nym-validator-client", + "rand 0.7.3", + "serde", + "tap", + "tokio", + "tokio-stream", + "url", + "wasmtimer", +] + [[package]] name = "nym-validator-client" version = "0.1.0" diff --git a/nym-connect/desktop/src-tauri/Cargo.toml b/nym-connect/desktop/src-tauri/Cargo.toml index 193d3a53a3d..3aa3dcc907d 100644 --- a/nym-connect/desktop/src-tauri/Cargo.toml +++ b/nym-connect/desktop/src-tauri/Cargo.toml @@ -61,6 +61,7 @@ nym-socks5-client-core = { path = "../../../common/socks5-client-core" } nym-sphinx = { path = "../../../common/nymsphinx" } nym-task = { path = "../../../common/task" } nym-topology = { path = "../../../common/topology" } +nym-topology-control = { path = "../../../common/topology-control" } nym-validator-client = { path = "../../../common/client-libs/validator-client" } [dev-dependencies] diff --git a/nym-connect/desktop/src-tauri/src/tasks.rs b/nym-connect/desktop/src-tauri/src/tasks.rs index d4264f22740..e76e9bc32f4 100644 --- a/nym-connect/desktop/src-tauri/src/tasks.rs +++ b/nym-connect/desktop/src-tauri/src/tasks.rs @@ -4,12 +4,13 @@ use nym_client_core::{ client::base_client::storage::{ gateway_details::GatewayDetailsStore, MixnetClientStorage, OnDiskPersistent, }, - config::{GroupBy, TopologyStructure}, + config::TopologyStructure, error::ClientCoreStatusMessage, }; use nym_socks5_client_core::{NymClient as Socks5NymClient, Socks5ControlMessageSender}; use nym_sphinx::params::PacketSize; use nym_task::manager::TaskStatus; +use nym_topology_control::geo_aware_provider::GroupBy; use std::sync::Arc; use tap::TapFallible; use tokio::sync::RwLock; diff --git a/nym-node/nym-node-requests/src/api/client.rs b/nym-node/nym-node-requests/src/api/client.rs index c4c89e2514a..8fcba9251fb 100644 --- a/nym-node/nym-node-requests/src/api/client.rs +++ b/nym-node/nym-node-requests/src/api/client.rs @@ -2,7 +2,7 @@ // SPDX-License-Identifier: Apache-2.0 use crate::api::v1::gateway::models::WebSockets; -use crate::api::v1::node::models::SignedHostInformation; +use crate::api::v1::node::models::{NoiseInformation, SignedHostInformation}; use crate::api::ErrorResponse; use crate::routes; use async_trait::async_trait; @@ -60,6 +60,11 @@ pub trait NymNodeApiClientExt: ApiClient { .await } + async fn get_noise_information(&self) -> Result { + self.get_json_from(routes::api::v1::noise_info_absolute()) + .await + } + async fn post_gateway_register_client( &self, client_message: &ClientMessage, diff --git a/nym-node/nym-node-requests/src/api/v1/node/models.rs b/nym-node/nym-node-requests/src/api/v1/node/models.rs index 3b79601a00d..962bc108ecd 100644 --- a/nym-node/nym-node-requests/src/api/v1/node/models.rs +++ b/nym-node/nym-node-requests/src/api/v1/node/models.rs @@ -42,3 +42,9 @@ pub struct HostKeys { /// Currently it corresponds to either mixnode's or gateway's key. pub x25519: String, } + +#[derive(Default, Serialize, Deserialize, Debug, Clone, JsonSchema)] +#[cfg_attr(feature = "openapi", derive(utoipa::ToSchema))] +pub struct NoiseInformation { + pub supported: bool, +} diff --git a/nym-node/nym-node-requests/src/lib.rs b/nym-node/nym-node-requests/src/lib.rs index 9bb345eca99..ed6aa75b603 100644 --- a/nym-node/nym-node-requests/src/lib.rs +++ b/nym-node/nym-node-requests/src/lib.rs @@ -28,6 +28,7 @@ pub mod routes { pub const ROLES: &str = "/roles"; pub const BUILD_INFO: &str = "/build-information"; pub const HOST_INFO: &str = "/host-information"; + pub const NOISE_INFO: &str = "/noise"; pub const HEALTH: &str = "/health"; pub const GATEWAY: &str = "/gateway"; @@ -45,6 +46,7 @@ pub mod routes { absolute_route!(mixnode_absolute, v1_absolute(), MIXNODE); absolute_route!(network_requester_absolute, v1_absolute(), NETWORK_REQUESTER); absolute_route!(ip_packet_router_absolute, v1_absolute(), IP_PACKET_ROUTER); + absolute_route!(noise_info_absolute, v1_absolute(), NOISE_INFO); absolute_route!(swagger_absolute, v1_absolute(), SWAGGER); pub mod gateway { diff --git a/nym-node/src/http/router/api/v1/node/mod.rs b/nym-node/src/http/router/api/v1/node/mod.rs index 620a446090e..56e2df1723f 100644 --- a/nym-node/src/http/router/api/v1/node/mod.rs +++ b/nym-node/src/http/router/api/v1/node/mod.rs @@ -3,6 +3,7 @@ use crate::http::api::v1::node::build_information::build_information; use crate::http::api::v1::node::host_information::host_information; +use crate::http::api::v1::node::noise_information::noise_information; use crate::http::api::v1::node::roles::roles; use axum::routing::get; use axum::Router; @@ -11,12 +12,14 @@ use nym_node_requests::routes::api::v1; pub mod build_information; pub mod host_information; +pub mod noise_information; pub mod roles; #[derive(Debug, Clone)] pub struct Config { pub build_information: models::BinaryBuildInformationOwned, pub host_information: models::SignedHostInformation, + pub noise_information: models::NoiseInformation, pub roles: models::NodeRoles, } @@ -43,4 +46,11 @@ pub(super) fn routes(config: Config) -> Router move |query| host_information(host_info, query) }), ) + .route( + v1::NOISE_INFO, + get({ + let noise_info = config.noise_information; + move |query| noise_information(noise_info, query) + }), + ) } diff --git a/nym-node/src/http/router/api/v1/node/noise_information.rs b/nym-node/src/http/router/api/v1/node/noise_information.rs new file mode 100644 index 00000000000..d5eb88b5e9e --- /dev/null +++ b/nym-node/src/http/router/api/v1/node/noise_information.rs @@ -0,0 +1,30 @@ +// Copyright 2023 - Nym Technologies SA +// SPDX-License-Identifier: GPL-3.0-only + +use crate::http::api::{FormattedResponse, OutputParams}; +use axum::extract::Query; +use nym_node_requests::api::v1::node::models::NoiseInformation; + +/// Returns host information of this node. +#[utoipa::path( + get, + path = "/noise", + context_path = "/api/v1", + tag = "Node", + responses( + (status = 200, content( + ("application/json" = NoiseInformation), + ("application/yaml" = NoiseInformation) + )) + ), + params(OutputParams) +)] +pub(crate) async fn noise_information( + host_information: NoiseInformation, + Query(output): Query, +) -> NoiseInformationResponse { + let output = output.output.unwrap_or_default(); + output.to_response(host_information) +} + +pub type NoiseInformationResponse = FormattedResponse; diff --git a/nym-node/src/http/router/mod.rs b/nym-node/src/http/router/mod.rs index 71de3c3c8da..fd7949887be 100644 --- a/nym-node/src/http/router/mod.rs +++ b/nym-node/src/http/router/mod.rs @@ -12,7 +12,7 @@ use nym_node_requests::api::v1::ip_packet_router::models::IpPacketRouter; use nym_node_requests::api::v1::mixnode::models::Mixnode; use nym_node_requests::api::v1::network_requester::exit_policy::models::UsedExitPolicy; use nym_node_requests::api::v1::network_requester::models::NetworkRequester; -use nym_node_requests::api::v1::node::models; +use nym_node_requests::api::v1::node::models::{self, NoiseInformation}; use nym_node_requests::api::SignedHostInformation; use nym_node_requests::routes; use std::net::SocketAddr; @@ -33,6 +33,7 @@ impl Config { pub fn new( build_information: models::BinaryBuildInformationOwned, host_information: SignedHostInformation, + noise_information: NoiseInformation, ) -> Self { Config { landing: Default::default(), @@ -41,6 +42,7 @@ impl Config { node: api::v1::node::Config { build_information, host_information, + noise_information, roles: Default::default(), }, gateway: Default::default(), diff --git a/scripts/build_topology.py b/scripts/build_topology.py index 14ec7b7d540..a808620cc3e 100644 --- a/scripts/build_topology.py +++ b/scripts/build_topology.py @@ -14,6 +14,18 @@ def add_mixnode(base_network, base_dir, mix_id): base_network["mixnodes"][str(mix_id)][0]["layer"] = mix_id base_network["mixnodes"][str(mix_id)][0]["mix_id"] = mix_id base_network["mixnodes"][str(mix_id)][0]["owner"] = "whatever" + + #described_node + template = mixnode_template() + template["Mixnode"]["bond"]["mix_node"]["identity_key"] = mix_data["identity_key"] + template["Mixnode"]["bond"]["mix_node"]["sphinx_key"] = mix_data["sphinx_key"] + template["Mixnode"]["bond"]["mix_node"]["mix_port"] = mix_data["mix_port"] + template["Mixnode"]["bond"]["mix_node"]["host"] = mix_data["bind_address"] + template["Mixnode"]["bond"]["layer"] = mix_id + template["Mixnode"]["bond"]["mix_id"] = mix_id + template["Mixnode"]["self_described"]["host_information"]["keys"]["ed25519"] = mix_data["identity_key"] + template["Mixnode"]["self_described"]["host_information"]["keys"]["x25519"] = mix_data["sphinx_key"] + base_network["described_nodes"][mix_id] = template return base_network @@ -27,6 +39,17 @@ def add_gateway(base_network, base_dir): # base_network["gateways"][0]["version"] = gateway_data["version"] base_network["gateways"][0]["host"] = gateway_data["bind_address"] base_network["gateways"][0]["owner"] = "whatever" + + #described_node + template = gateway_template() + template["Gateway"]["bond"]["gateway"]["identity_key"] = gateway_data["identity_key"] + template["Gateway"]["bond"]["gateway"]["sphinx_key"] = gateway_data["sphinx_key"] + template["Gateway"]["bond"]["gateway"]["mix_port"] = gateway_data["mix_port"] + template["Gateway"]["bond"]["gateway"]["clients_port"] = gateway_data["clients_port"] + template["Gateway"]["bond"]["gateway"]["host"] = gateway_data["bind_address"] + template["Gateway"]["self_described"]["host_information"]["keys"]["ed25519"] = gateway_data["identity_key"] + template["Gateway"]["self_described"]["host_information"]["keys"]["x25519"] = gateway_data["sphinx_key"] + base_network["described_nodes"][0] = template return base_network @@ -37,7 +60,8 @@ def main(args): "2": [{}], "3": [{}], }, - "gateways": [{}] + "gateways": [{}], + "described_nodes":[{}, {}, {}, {}] } base_dir = args[0] @@ -50,5 +74,117 @@ def main(args): json.dump(base_network, out, indent=2) +def gateway_template(): + return {"Gateway": { + "bond": { + "pledge_amount": { + "denom": "unym", + "amount": "0" + }, + "owner": "whatever", + "block_height": 0, + "gateway": { + "host": "TO_BE_FILLED", + "mix_port": "TO_BE_FILLED", + "clients_port": "TO_BE_FILLED", + "location": "whatever", + "sphinx_key": "TO_BE_FILLED", + "identity_key": "TO_BE_FILLED", + "version": "whatever", + }, + "proxy": None, + }, + "self_described": { + "host_information": { + "ip_address": [ + "0.0.0.0" + ], + "hostname": None, + "keys": { + "ed25519": "TO_BE_FILLED", + "x25519": "TO_BE_FILLED" + } + }, + "build_information": { + "binary_name": "whatever", + "build_timestamp": "whatever", + "build_version": "whatever", + "commit_sha": "whatever", + "commit_timestamp": "whatever", + "commit_branch": "whatever", + "rustc_version": "whatever", + "rustc_channel": "whatever", + "cargo_profile": "whatever" + }, + "network_requester": { + "address": "none", + "uses_exit_policy": True + }, + "mixnet_websockets": { + "ws_port": 9000, + "wss_port": None + }, + "noise_information": { + "supported": True + } + } + }} + +def mixnode_template(): + return { + "Mixnode": { + "bond": { + "mix_id": "TO_BE_FILLED", + "owner": "whatever", + "original_pledge": { + "denom": "unym", + "amount": "0" + }, + "layer": "TO_BE_FILLED", + "mix_node": { + "host": "TO_BE_FILLED", + "mix_port": "TO_BE_FILLED", + "verloc_port": 1790, + "http_api_port": 8000, + "sphinx_key": "TO_BE_FILLED", + "identity_key": "TO_BE_FILLED", + "version": "whatever" + }, + "proxy": None, + "bonding_height": 0, + "is_unbonding": False + }, + "self_described": { + "host_information": { + "ip_address": [ + "0.0.0.0" + ], + "hostname": None, + "keys": { + "ed25519": "TO_BE_FILLED", + "x25519": "TO_BE_FILLED" + } + }, + "build_information": { + "binary_name": "whatever", + "build_timestamp": "whatever", + "build_version": "whatever", + "commit_sha": "whatever", + "commit_timestamp": "whatever", + "commit_branch": "whatever", + "rustc_version": "whatever", + "rustc_channel": "whatever", + "cargo_profile": "whatever" + }, + "network_requester": None, + "ip_packet_router": None, + "mixnet_websockets": None, + "noise_information": { + "supported": True + } + } + } + } + if __name__ == '__main__': main(sys.argv[1:]) diff --git a/scripts/localnet_start.sh b/scripts/localnet_start.sh index 1a821c31e12..679edd7fd56 100755 --- a/scripts/localnet_start.sh +++ b/scripts/localnet_start.sh @@ -42,10 +42,10 @@ echo "the full network file is located at $networkfile" echo "starting the mixnet..." tmux start-server -tmux new-session -d -s localnet -n Mixnet -d "/usr/bin/env sh -c \" cargo run --release --bin nym-mixnode -- run --id mix1-$suffix \"" -tmux split-window -t localnet:0 "/usr/bin/env sh -c \" cargo run --release --bin nym-mixnode -- run --id mix2-$suffix \"" -tmux split-window -t localnet:0 "/usr/bin/env sh -c \" cargo run --release --bin nym-mixnode -- run --id mix3-$suffix \"" -tmux split-window -t localnet:0 "/usr/bin/env sh -c \" cargo run --release --bin nym-gateway -- run --id gateway-$suffix --local \"" +tmux new-session -d -s localnet -n Mixnet -d "/usr/bin/env sh -c \" cargo run --release --bin nym-mixnode -- run --id mix1-$suffix --custom-mixnet \"$networkfile\" \"" +tmux split-window -t localnet:0 "/usr/bin/env sh -c \" cargo run --release --bin nym-mixnode -- run --id mix2-$suffix --custom-mixnet \"$networkfile\" \"" +tmux split-window -t localnet:0 "/usr/bin/env sh -c \" cargo run --release --bin nym-mixnode -- run --id mix3-$suffix --custom-mixnet \"$networkfile\" \"" +tmux split-window -t localnet:0 "/usr/bin/env sh -c \" cargo run --release --bin nym-gateway -- run --id gateway-$suffix --local --custom-mixnet \"$networkfile\" \"" while ! nc -z localhost 9000; do echo "waiting for nym-gateway to launch on port 9000..." diff --git a/sdk/rust/nym-sdk/Cargo.toml b/sdk/rust/nym-sdk/Cargo.toml index c0a0960f79e..5dbd239da40 100644 --- a/sdk/rust/nym-sdk/Cargo.toml +++ b/sdk/rust/nym-sdk/Cargo.toml @@ -20,6 +20,7 @@ nym-network-defaults = { path = "../../../common/network-defaults" } nym-sphinx = { path = "../../../common/nymsphinx" } nym-task = { path = "../../../common/task" } nym-topology = { path = "../../../common/topology" } +nym-topology-control = { path = "../../../common/topology-control" } nym-socks5-client-core = { path = "../../../common/socks5-client-core" } nym-validator-client = { path = "../../../common/client-libs/validator-client", features = ["http-client"] } nym-socks5-requests = { path = "../../../common/socks5/requests" } diff --git a/sdk/rust/nym-sdk/examples/manually_overwrite_topology.rs b/sdk/rust/nym-sdk/examples/manually_overwrite_topology.rs index 175a64f4cdc..5d8eb4e0a22 100644 --- a/sdk/rust/nym-sdk/examples/manually_overwrite_topology.rs +++ b/sdk/rust/nym-sdk/examples/manually_overwrite_topology.rs @@ -23,7 +23,7 @@ async fn main() { mix_id: 63, owner: "n1k52k5n45cqt5qpjh8tcwmgqm0wkt355yy0g5vu".to_string(), host: "172.105.92.48".parse().unwrap(), - mix_host: "172.105.92.48:1789".parse().unwrap(), + mix_hosts: vec!["172.105.92.48:1789".parse().unwrap()], identity_key: "GLdR2NRVZBiCoCbv4fNqt9wUJZAnNjGXHkx3TjVAUzrK" .parse() .unwrap(), @@ -40,7 +40,7 @@ async fn main() { mix_id: 23, owner: "n1fzv4jc7fanl9s0qj02ge2ezk3kts545kjtek47".to_string(), host: "178.79.143.65".parse().unwrap(), - mix_host: "178.79.143.65:1789".parse().unwrap(), + mix_hosts: vec!["178.79.143.65:1789".parse().unwrap()], identity_key: "4Yr4qmEHd9sgsuQ83191FR2hD88RfsbMmB4tzhhZWriz" .parse() .unwrap(), @@ -57,7 +57,7 @@ async fn main() { mix_id: 66, owner: "n1ae2pjd7q9p0dea65pqkvcm4x9s264v4fktpyru".to_string(), host: "139.162.247.97".parse().unwrap(), - mix_host: "139.162.247.97:1789".parse().unwrap(), + mix_hosts: vec!["139.162.247.97:1789".parse().unwrap()], identity_key: "66UngapebhJRni3Nj52EW1qcNsWYiuonjkWJzHFsmyYY" .parse() .unwrap(), diff --git a/sdk/rust/nym-sdk/src/mixnet.rs b/sdk/rust/nym-sdk/src/mixnet.rs index b54ef462ee4..e0daf655028 100644 --- a/sdk/rust/nym-sdk/src/mixnet.rs +++ b/sdk/rust/nym-sdk/src/mixnet.rs @@ -54,9 +54,8 @@ pub use nym_client_core::{ fs_backend::Backend as ReplyStorage, CombinedReplyStorage, Empty as EmptyReplyStorage, ReplyStorageBackend, }, - topology_control::geo_aware_provider::{CountryGroup, GeoAwareTopologyProvider}, }, - config::{GatewayEndpointConfig, GroupBy}, + config::GatewayEndpointConfig, }; pub use nym_credential_storage::{ ephemeral_storage::EphemeralStorage as EphemeralCredentialStorage, @@ -73,6 +72,9 @@ pub use nym_sphinx::{ receiver::ReconstructedMessage, }; pub use nym_topology::{provider_trait::TopologyProvider, NymTopology}; +pub use nym_topology_control::geo_aware_provider::{ + CountryGroup, GeoAwareTopologyProvider, GroupBy, +}; pub use paths::StoragePaths; pub use socks5_client::Socks5MixnetClient; pub use traits::MixnetMessageSender;