From e8248dbff3cb18b0b237c20571ef9b7e7e7542bf Mon Sep 17 00:00:00 2001 From: Simon Wicky Date: Tue, 6 Feb 2024 09:45:37 +0100 Subject: [PATCH 1/9] move topology control into its own crate --- Cargo.lock | 28 +++++++++++++-- clients/socks5/Cargo.toml | 1 + clients/socks5/src/commands/mod.rs | 4 +-- clients/socks5/src/commands/run.rs | 2 +- common/client-core/Cargo.toml | 1 + .../client-core/src/client/base_client/mod.rs | 8 ++--- .../src/client/cover_traffic_stream.rs | 2 +- common/client-core/src/client/mod.rs | 1 - .../real_messages_control/message_handler.rs | 2 +- .../src/client/real_messages_control/mod.rs | 2 +- .../real_traffic_stream.rs | 2 +- common/client-core/src/config/mod.rs | 24 ++----------- .../src/config/old_config_v1_1_30.rs | 6 ++-- common/topology-control/Cargo.toml | 34 +++++++++++++++++++ .../src}/accessor.rs | 2 +- .../src}/geo_aware_provider.rs | 19 +++++++++-- .../mod.rs => topology-control/src/lib.rs} | 6 ++-- .../src}/nym_api_provider.rs | 0 gateway/Cargo.toml | 2 +- gateway/src/node/mod.rs | 8 ++--- mixnode/Cargo.toml | 2 +- mixnode/src/node/mod.rs | 8 ++--- sdk/rust/nym-sdk/Cargo.toml | 1 + sdk/rust/nym-sdk/src/mixnet.rs | 6 ++-- 24 files changed, 114 insertions(+), 57 deletions(-) create mode 100644 common/topology-control/Cargo.toml rename common/{client-core/src/client/topology_control => topology-control/src}/accessor.rs (99%) rename common/{client-core/src/client/topology_control => topology-control/src}/geo_aware_provider.rs (95%) rename common/{client-core/src/client/topology_control/mod.rs => topology-control/src/lib.rs} (97%) rename common/{client-core/src/client/topology_control => topology-control/src}/nym_api_provider.rs (100%) diff --git a/Cargo.lock b/Cargo.lock index 75a4c2ed9ae..e3d04138478 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -6240,6 +6240,7 @@ dependencies = [ "nym-sphinx", "nym-task", "nym-topology", + "nym-topology-control", "nym-validator-client", "rand 0.7.3", "reqwest", @@ -6547,7 +6548,6 @@ dependencies = [ "log", "nym-api-requests", "nym-bin-common", - "nym-client-core", "nym-coconut-interface", "nym-config", "nym-credentials", @@ -6564,6 +6564,7 @@ dependencies = [ "nym-statistics-common", "nym-task", "nym-topology", + "nym-topology-control", "nym-types", "nym-validator-client", "nym-wireguard", @@ -6754,7 +6755,6 @@ dependencies = [ "lazy_static", "log", "nym-bin-common", - "nym-client-core", "nym-config", "nym-contracts-common", "nym-crypto", @@ -6768,6 +6768,7 @@ dependencies = [ "nym-sphinx-types", "nym-task", "nym-topology", + "nym-topology-control", "nym-types", "nym-validator-client", "opentelemetry", @@ -7106,6 +7107,7 @@ dependencies = [ "nym-sphinx", "nym-task", "nym-topology", + "nym-topology-control", "nym-validator-client", "parking_lot 0.12.1", "pretty_env_logger", @@ -7171,6 +7173,7 @@ dependencies = [ "nym-socks5-client-core", "nym-sphinx", "nym-topology", + "nym-topology-control", "pretty_env_logger", "rand 0.7.3", "serde", @@ -7483,6 +7486,27 @@ dependencies = [ "wasm-utils", ] +[[package]] +name = "nym-topology-control" +version = "1.1.15" +dependencies = [ + "async-trait", + "futures", + "log", + "nym-explorer-client", + "nym-network-defaults", + "nym-sphinx", + "nym-task", + "nym-topology", + "nym-validator-client", + "rand 0.7.3", + "serde", + "tap", + "tokio", + "tokio-stream", + "url", +] + [[package]] name = "nym-tun" version = "0.1.0" diff --git a/clients/socks5/Cargo.toml b/clients/socks5/Cargo.toml index 72a94c4c2d9..810f53488d4 100644 --- a/clients/socks5/Cargo.toml +++ b/clients/socks5/Cargo.toml @@ -34,6 +34,7 @@ nym-sphinx = { path = "../../common/nymsphinx" } nym-ordered-buffer = { path = "../../common/socks5/ordered-buffer" } nym-pemstore = { path = "../../common/pemstore" } nym-topology = { path = "../../common/topology" } +nym-topology-control = { path = "../../common/topology-control" } nym-socks5-client-core = { path = "../../common/socks5-client-core" } [features] diff --git a/clients/socks5/src/commands/mod.rs b/clients/socks5/src/commands/mod.rs index 7bd5205791f..dcda28fb3dc 100644 --- a/clients/socks5/src/commands/mod.rs +++ b/clients/socks5/src/commands/mod.rs @@ -17,11 +17,11 @@ use nym_client_core::client::base_client::storage::gateway_details::{ OnDiskGatewayDetails, PersistedGatewayDetails, }; use nym_client_core::client::key_manager::persistence::OnDiskKeys; -use nym_client_core::client::topology_control::geo_aware_provider::CountryGroup; -use nym_client_core::config::{GatewayEndpointConfig, GroupBy, TopologyStructure}; +use nym_client_core::config::{GatewayEndpointConfig, TopologyStructure}; use nym_client_core::error::ClientCoreError; use nym_config::OptionalSet; use nym_sphinx::params::{PacketSize, PacketType}; +use nym_topology_control::geo_aware_provider::{CountryGroup, GroupBy}; use std::error::Error; use std::net::IpAddr; diff --git a/clients/socks5/src/commands/run.rs b/clients/socks5/src/commands/run.rs index 5638c80d9ba..edb1c6ec875 100644 --- a/clients/socks5/src/commands/run.rs +++ b/clients/socks5/src/commands/run.rs @@ -12,9 +12,9 @@ use log::*; use nym_bin_common::version_checker::is_minor_version_compatible; use nym_client_core::cli_helpers::client_run::CommonClientRunArgs; use nym_client_core::client::base_client::storage::OnDiskPersistent; -use nym_client_core::client::topology_control::geo_aware_provider::CountryGroup; use nym_socks5_client_core::NymClient; use nym_sphinx::addressing::clients::Recipient; +use nym_topology_control::geo_aware_provider::CountryGroup; use std::net::IpAddr; #[derive(Args, Clone)] diff --git a/common/client-core/Cargo.toml b/common/client-core/Cargo.toml index dc6c9bba2b2..a9776ff911f 100644 --- a/common/client-core/Cargo.toml +++ b/common/client-core/Cargo.toml @@ -42,6 +42,7 @@ nym-nonexhaustive-delayqueue = { path = "../nonexhaustive-delayqueue" } nym-sphinx = { path = "../nymsphinx" } nym-pemstore = { path = "../pemstore" } nym-topology = { path = "../topology", features = ["serializable"] } +nym-topology-control = { path = "../topology-control" } nym-validator-client = { path = "../client-libs/validator-client", default-features = false } nym-task = { path = "../task" } nym-credential-storage = { path = "../credential-storage" } diff --git a/common/client-core/src/client/base_client/mod.rs b/common/client-core/src/client/base_client/mod.rs index 47b119261a6..56e33c2aa56 100644 --- a/common/client-core/src/client/base_client/mod.rs +++ b/common/client-core/src/client/base_client/mod.rs @@ -3,7 +3,6 @@ use super::packet_statistics_control::PacketStatisticsReporter; use super::received_buffer::ReceivedBufferMessage; -use super::topology_control::geo_aware_provider::GeoAwareTopologyProvider; use crate::client::base_client::storage::gateway_details::GatewayDetailsStore; use crate::client::base_client::storage::MixnetClientStorage; use crate::client::cover_traffic_stream::LoopCoverTrafficStream; @@ -22,10 +21,6 @@ use crate::client::replies::reply_controller::{ReplyControllerReceiver, ReplyCon use crate::client::replies::reply_storage::{ CombinedReplyStorage, PersistentReplyStorage, ReplyStorageBackend, SentReplyKeys, }; -use crate::client::topology_control::nym_api_provider::NymApiTopologyProvider; -use crate::client::topology_control::{ - TopologyAccessor, TopologyRefresher, TopologyRefresherConfig, -}; use crate::config::{Config, DebugConfig}; use crate::error::ClientCoreError; use crate::init::{ @@ -50,6 +45,9 @@ use nym_task::connections::{ConnectionCommandReceiver, ConnectionCommandSender, use nym_task::{TaskClient, TaskHandle}; use nym_topology::provider_trait::TopologyProvider; use nym_topology::HardcodedTopologyProvider; +use nym_topology_control::geo_aware_provider::GeoAwareTopologyProvider; +use nym_topology_control::nym_api_provider::NymApiTopologyProvider; +use nym_topology_control::{TopologyAccessor, TopologyRefresher, TopologyRefresherConfig}; use nym_validator_client::nyxd::contract_traits::DkgQueryClient; use std::fmt::Debug; use std::path::Path; diff --git a/common/client-core/src/client/cover_traffic_stream.rs b/common/client-core/src/client/cover_traffic_stream.rs index 101a0a77548..d650fb16f91 100644 --- a/common/client-core/src/client/cover_traffic_stream.rs +++ b/common/client-core/src/client/cover_traffic_stream.rs @@ -3,7 +3,6 @@ use crate::client::mix_traffic::BatchMixMessageSender; use crate::client::packet_statistics_control::{PacketStatisticsEvent, PacketStatisticsReporter}; -use crate::client::topology_control::TopologyAccessor; use crate::{config, spawn_future}; use futures::task::{Context, Poll}; use futures::{Future, Stream, StreamExt}; @@ -13,6 +12,7 @@ use nym_sphinx::addressing::clients::Recipient; use nym_sphinx::cover::generate_loop_cover_packet; use nym_sphinx::params::{PacketSize, PacketType}; use nym_sphinx::utils::sample_poisson_duration; +use nym_topology_control::TopologyAccessor; use rand::{rngs::OsRng, CryptoRng, Rng}; use std::pin::Pin; use std::sync::Arc; diff --git a/common/client-core/src/client/mod.rs b/common/client-core/src/client/mod.rs index 9a3a3a33c99..93340e50cfb 100644 --- a/common/client-core/src/client/mod.rs +++ b/common/client-core/src/client/mod.rs @@ -11,5 +11,4 @@ pub(crate) mod packet_statistics_control; pub mod real_messages_control; pub mod received_buffer; pub mod replies; -pub mod topology_control; pub(crate) mod transmission_buffer; diff --git a/common/client-core/src/client/real_messages_control/message_handler.rs b/common/client-core/src/client/real_messages_control/message_handler.rs index 42a19938440..11bb514c661 100644 --- a/common/client-core/src/client/real_messages_control/message_handler.rs +++ b/common/client-core/src/client/real_messages_control/message_handler.rs @@ -7,7 +7,6 @@ use crate::client::real_messages_control::real_traffic_stream::{ }; use crate::client::real_messages_control::{AckActionSender, Action}; use crate::client::replies::reply_storage::{ReceivedReplySurbsMap, SentReplyKeys, UsedSenderTags}; -use crate::client::topology_control::{TopologyAccessor, TopologyReadPermit}; use log::{debug, error, info, trace, warn}; use nym_sphinx::acknowledgements::AckKey; use nym_sphinx::addressing::clients::Recipient; @@ -20,6 +19,7 @@ use nym_sphinx::preparer::{MessagePreparer, PreparedFragment}; use nym_sphinx::Delay; use nym_task::connections::TransmissionLane; use nym_topology::{NymTopology, NymTopologyError}; +use nym_topology_control::{TopologyAccessor, TopologyReadPermit}; use rand::{CryptoRng, Rng}; use std::collections::HashMap; use std::sync::Arc; diff --git a/common/client-core/src/client/real_messages_control/mod.rs b/common/client-core/src/client/real_messages_control/mod.rs index d4914f14406..1aef76ebe77 100644 --- a/common/client-core/src/client/real_messages_control/mod.rs +++ b/common/client-core/src/client/real_messages_control/mod.rs @@ -17,7 +17,6 @@ use crate::{ client::{ inbound_messages::InputMessageReceiver, mix_traffic::BatchMixMessageSender, real_messages_control::acknowledgement_control::AcknowledgementControllerConnectors, - topology_control::TopologyAccessor, }, spawn_future, }; @@ -28,6 +27,7 @@ use nym_sphinx::acknowledgements::AckKey; use nym_sphinx::addressing::clients::Recipient; use nym_sphinx::params::PacketType; use nym_task::connections::{ConnectionCommandReceiver, LaneQueueLengths}; +use nym_topology_control::TopologyAccessor; use rand::{rngs::OsRng, CryptoRng, Rng}; use std::sync::Arc; diff --git a/common/client-core/src/client/real_messages_control/real_traffic_stream.rs b/common/client-core/src/client/real_messages_control/real_traffic_stream.rs index ef91208f1ba..a742b20e203 100644 --- a/common/client-core/src/client/real_messages_control/real_traffic_stream.rs +++ b/common/client-core/src/client/real_messages_control/real_traffic_stream.rs @@ -5,7 +5,6 @@ use self::sending_delay_controller::SendingDelayController; use crate::client::mix_traffic::BatchMixMessageSender; use crate::client::packet_statistics_control::{PacketStatisticsEvent, PacketStatisticsReporter}; use crate::client::real_messages_control::acknowledgement_control::SentPacketNotificationSender; -use crate::client::topology_control::TopologyAccessor; use crate::client::transmission_buffer::TransmissionBuffer; use crate::config; use futures::task::{Context, Poll}; @@ -22,6 +21,7 @@ use nym_sphinx::utils::sample_poisson_duration; use nym_task::connections::{ ConnectionCommand, ConnectionCommandReceiver, ConnectionId, LaneQueueLengths, TransmissionLane, }; +use nym_topology_control::TopologyAccessor; use rand::{CryptoRng, Rng}; use std::pin::Pin; use std::sync::Arc; diff --git a/common/client-core/src/config/mod.rs b/common/client-core/src/config/mod.rs index 3f4bbb4efec..a52b5f8c4dd 100644 --- a/common/client-core/src/config/mod.rs +++ b/common/client-core/src/config/mod.rs @@ -1,14 +1,12 @@ // Copyright 2021-2023 - Nym Technologies SA // SPDX-License-Identifier: Apache-2.0 -use crate::{client::topology_control::geo_aware_provider::CountryGroup, error::ClientCoreError}; +use crate::error::ClientCoreError; use nym_config::defaults::NymNetworkDetails; use nym_crypto::asymmetric::identity; use nym_gateway_client::client::GatewayConfig; -use nym_sphinx::{ - addressing::clients::Recipient, - params::{PacketSize, PacketType}, -}; +use nym_sphinx::params::{PacketSize, PacketType}; +use nym_topology_control::geo_aware_provider::GroupBy; use serde::{Deserialize, Serialize}; use std::time::Duration; use url::Url; @@ -541,22 +539,6 @@ pub enum TopologyStructure { GeoAware(GroupBy), } -#[allow(clippy::large_enum_variant)] -#[derive(Copy, Clone, Debug, PartialEq, Serialize, Deserialize)] -pub enum GroupBy { - CountryGroup(CountryGroup), - NymAddress(Recipient), -} - -impl std::fmt::Display for GroupBy { - fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { - match self { - GroupBy::CountryGroup(group) => write!(f, "group: {}", group), - GroupBy::NymAddress(address) => write!(f, "address: {}", address), - } - } -} - impl Default for Topology { fn default() -> Self { Topology { diff --git a/common/client-core/src/config/old_config_v1_1_30.rs b/common/client-core/src/config/old_config_v1_1_30.rs index 87dae0eb3f4..607fabc7f79 100644 --- a/common/client-core/src/config/old_config_v1_1_30.rs +++ b/common/client-core/src/config/old_config_v1_1_30.rs @@ -1,15 +1,15 @@ // Copyright 2023 - Nym Technologies SA // SPDX-License-Identifier: Apache-2.0 -use crate::client::topology_control::geo_aware_provider::CountryGroup; use crate::config::{ - Acknowledgements, Client, Config, CoverTraffic, DebugConfig, GatewayConnection, GroupBy, - ReplySurbs, Topology, TopologyStructure, Traffic, + Acknowledgements, Client, Config, CoverTraffic, DebugConfig, GatewayConnection, ReplySurbs, + Topology, TopologyStructure, Traffic, }; use nym_sphinx::{ addressing::clients::Recipient, params::{PacketSize, PacketType}, }; +use nym_topology_control::geo_aware_provider::{CountryGroup, GroupBy}; use serde::{Deserialize, Serialize}; use std::time::Duration; use url::Url; diff --git a/common/topology-control/Cargo.toml b/common/topology-control/Cargo.toml new file mode 100644 index 00000000000..016c21b7488 --- /dev/null +++ b/common/topology-control/Cargo.toml @@ -0,0 +1,34 @@ +[package] +name = "nym-topology-control" +version = "1.1.15" +authors = ["Dave Hrycyszyn "] +edition = "2021" +rust-version = "1.66" +license.workspace = true + +# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html + +[dependencies] +async-trait = { workspace = true } +futures = { workspace = true } +log = { workspace = true } +rand = { version = "0.7.3", features = ["wasm-bindgen"] } +serde = { workspace = true, features = ["derive"] } +tap = "1.0.1" +url = { workspace = true, features = ["serde"] } +tokio = { workspace = true, features = ["macros"]} + +# internal + +nym-explorer-client = { path = "../../explorer-api/explorer-client" } +nym-sphinx = { path = "../nymsphinx" } +nym-topology = { path = "../topology", features = ["serializable"] } +nym-validator-client = { path = "../client-libs/validator-client", default-features = false } +nym-task = { path = "../task" } +nym-network-defaults = { path = "../network-defaults" } + + +[target."cfg(not(target_arch = \"wasm32\"))".dependencies.tokio-stream] +version = "0.1.11" +features = ["time"] + diff --git a/common/client-core/src/client/topology_control/accessor.rs b/common/topology-control/src/accessor.rs similarity index 99% rename from common/client-core/src/client/topology_control/accessor.rs rename to common/topology-control/src/accessor.rs index f4b50be0594..da463a4e91a 100644 --- a/common/client-core/src/client/topology_control/accessor.rs +++ b/common/topology-control/src/accessor.rs @@ -49,7 +49,7 @@ impl<'a> Deref for TopologyReadPermit<'a> { impl<'a> TopologyReadPermit<'a> { /// Using provided topology read permit, tries to get an immutable reference to the underlying /// topology. For obvious reasons the lifetime of the topology reference is bound to the permit. - pub(crate) fn try_get_valid_topology_ref( + pub fn try_get_valid_topology_ref( &'a self, ack_recipient: &Recipient, packet_recipient: Option<&Recipient>, diff --git a/common/client-core/src/client/topology_control/geo_aware_provider.rs b/common/topology-control/src/geo_aware_provider.rs similarity index 95% rename from common/client-core/src/client/topology_control/geo_aware_provider.rs rename to common/topology-control/src/geo_aware_provider.rs index 71de0327b59..d61309a7551 100644 --- a/common/client-core/src/client/topology_control/geo_aware_provider.rs +++ b/common/topology-control/src/geo_aware_provider.rs @@ -3,6 +3,7 @@ use std::{collections::HashMap, fmt}; use log::{debug, error, info}; use nym_explorer_client::{ExplorerClient, PrettyDetailedMixNodeBond}; use nym_network_defaults::var_names::EXPLORER_API; +use nym_sphinx::addressing::clients::Recipient; use nym_topology::{ nym_topology_from_detailed, provider_trait::{async_trait, TopologyProvider}, @@ -14,8 +15,6 @@ use serde::{Deserialize, Serialize}; use tap::TapOptional; use url::Url; -use crate::config::GroupBy; - const MIN_NODES_PER_LAYER: usize = 1; fn create_explorer_client() -> Option { @@ -38,6 +37,22 @@ fn create_explorer_client() -> Option { Some(client) } +#[allow(clippy::large_enum_variant)] +#[derive(Copy, Clone, Debug, PartialEq, Serialize, Deserialize)] +pub enum GroupBy { + CountryGroup(CountryGroup), + NymAddress(Recipient), +} + +impl std::fmt::Display for GroupBy { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + match self { + GroupBy::CountryGroup(group) => write!(f, "group: {}", group), + GroupBy::NymAddress(address) => write!(f, "address: {}", address), + } + } +} + #[derive(Copy, Clone, Hash, PartialEq, Eq, Serialize, Deserialize, Debug)] pub enum CountryGroup { Europe, diff --git a/common/client-core/src/client/topology_control/mod.rs b/common/topology-control/src/lib.rs similarity index 97% rename from common/client-core/src/client/topology_control/mod.rs rename to common/topology-control/src/lib.rs index fa54a0ca7f6..ead03a8c0b6 100644 --- a/common/client-core/src/client/topology_control/mod.rs +++ b/common/topology-control/src/lib.rs @@ -1,11 +1,11 @@ // Copyright 2021-2023 - Nym Technologies SA // SPDX-License-Identifier: Apache-2.0 -use crate::spawn_future; -pub(crate) use accessor::{TopologyAccessor, TopologyReadPermit}; +pub use accessor::{TopologyAccessor, TopologyReadPermit}; use futures::StreamExt; use log::*; use nym_sphinx::addressing::nodes::NodeIdentity; +use nym_task::spawn; use nym_topology::provider_trait::TopologyProvider; use nym_topology::NymTopologyError; use std::time::Duration; @@ -142,7 +142,7 @@ impl TopologyRefresher { } pub fn start_with_shutdown(mut self, mut shutdown: nym_task::TaskClient) { - spawn_future(async move { + spawn(async move { debug!("Started TopologyRefresher with graceful shutdown support"); #[cfg(not(target_arch = "wasm32"))] diff --git a/common/client-core/src/client/topology_control/nym_api_provider.rs b/common/topology-control/src/nym_api_provider.rs similarity index 100% rename from common/client-core/src/client/topology_control/nym_api_provider.rs rename to common/topology-control/src/nym_api_provider.rs diff --git a/gateway/Cargo.toml b/gateway/Cargo.toml index 4743d52addd..2e62aec499d 100644 --- a/gateway/Cargo.toml +++ b/gateway/Cargo.toml @@ -78,7 +78,7 @@ nym-task = { path = "../common/task" } nym-types = { path = "../common/types" } nym-validator-client = { path = "../common/client-libs/validator-client" } nym-ip-packet-router = { path = "../service-providers/ip-packet-router" } -nym-client-core = { path = "../common/client-core"} +nym-topology-control = { path = "../common/topology-control" } nym-topology = { path = "../common/topology" } nym-wireguard = { path = "../common/wireguard", optional = true } diff --git a/gateway/src/node/mod.rs b/gateway/src/node/mod.rs index 21a98a92efc..b65a64800fb 100644 --- a/gateway/src/node/mod.rs +++ b/gateway/src/node/mod.rs @@ -24,10 +24,6 @@ use anyhow::bail; use dashmap::DashMap; use futures::channel::{mpsc, oneshot}; use log::*; -use nym_client_core::client::topology_control::accessor::TopologyAccessor; -use nym_client_core::client::topology_control::nym_api_provider::NymApiTopologyProvider; -use nym_client_core::client::topology_control::TopologyRefresher; -use nym_client_core::client::topology_control::TopologyRefresherConfig; use nym_crypto::asymmetric::{encryption, identity}; use nym_mixnet_client::forwarder::{MixForwardingSender, PacketForwarder}; use nym_network_defaults::NymNetworkDetails; @@ -36,6 +32,10 @@ use nym_node::wireguard::types::GatewayClientRegistry; use nym_statistics_common::collector::StatisticsSender; use nym_task::{TaskClient, TaskManager}; use nym_topology::provider_trait::TopologyProvider; +use nym_topology_control::accessor::TopologyAccessor; +use nym_topology_control::nym_api_provider::NymApiTopologyProvider; +use nym_topology_control::TopologyRefresher; +use nym_topology_control::TopologyRefresherConfig; use nym_validator_client::{nyxd, DirectSigningHttpRpcNyxdClient}; use rand::seq::SliceRandom; use rand::thread_rng; diff --git a/mixnode/Cargo.toml b/mixnode/Cargo.toml index 7597e191770..d7fca9ca140 100644 --- a/mixnode/Cargo.toml +++ b/mixnode/Cargo.toml @@ -59,7 +59,7 @@ nym-pemstore = { path = "../common/pemstore", version = "0.3.0" } nym-task = { path = "../common/task" } nym-types = { path = "../common/types" } nym-topology = { path = "../common/topology" } -nym-client-core = { path = "../common/client-core/" } +nym-topology-control = { path = "../common/topology-control" } nym-validator-client = { path = "../common/client-libs/validator-client" } nym-bin-common = { path = "../common/bin-common", features = ["output_format"] } cpu-cycles = { path = "../cpu-cycles", optional = true } diff --git a/mixnode/src/node/mod.rs b/mixnode/src/node/mod.rs index ee9567f7e9a..a990b98b4ad 100644 --- a/mixnode/src/node/mod.rs +++ b/mixnode/src/node/mod.rs @@ -15,14 +15,14 @@ use crate::node::packet_delayforwarder::{DelayForwarder, PacketDelayForwardSende use log::{error, info, warn}; use nym_bin_common::output_format::OutputFormat; use nym_bin_common::version_checker::parse_version; -use nym_client_core::client::topology_control::accessor::TopologyAccessor; -use nym_client_core::client::topology_control::nym_api_provider::NymApiTopologyProvider; -use nym_client_core::client::topology_control::TopologyRefresher; -use nym_client_core::client::topology_control::TopologyRefresherConfig; use nym_crypto::asymmetric::{encryption, identity}; use nym_mixnode_common::verloc::{self, AtomicVerlocResult, VerlocMeasurer}; use nym_task::{TaskClient, TaskManager}; use nym_topology::provider_trait::TopologyProvider; +use nym_topology_control::accessor::TopologyAccessor; +use nym_topology_control::nym_api_provider::NymApiTopologyProvider; +use nym_topology_control::TopologyRefresher; +use nym_topology_control::TopologyRefresherConfig; use rand::seq::SliceRandom; use rand::thread_rng; use std::net::SocketAddr; diff --git a/sdk/rust/nym-sdk/Cargo.toml b/sdk/rust/nym-sdk/Cargo.toml index c0a0960f79e..5dbd239da40 100644 --- a/sdk/rust/nym-sdk/Cargo.toml +++ b/sdk/rust/nym-sdk/Cargo.toml @@ -20,6 +20,7 @@ nym-network-defaults = { path = "../../../common/network-defaults" } nym-sphinx = { path = "../../../common/nymsphinx" } nym-task = { path = "../../../common/task" } nym-topology = { path = "../../../common/topology" } +nym-topology-control = { path = "../../../common/topology-control" } nym-socks5-client-core = { path = "../../../common/socks5-client-core" } nym-validator-client = { path = "../../../common/client-libs/validator-client", features = ["http-client"] } nym-socks5-requests = { path = "../../../common/socks5/requests" } diff --git a/sdk/rust/nym-sdk/src/mixnet.rs b/sdk/rust/nym-sdk/src/mixnet.rs index 9dc4dd9681a..dbd52815118 100644 --- a/sdk/rust/nym-sdk/src/mixnet.rs +++ b/sdk/rust/nym-sdk/src/mixnet.rs @@ -54,9 +54,8 @@ pub use nym_client_core::{ fs_backend::Backend as ReplyStorage, CombinedReplyStorage, Empty as EmptyReplyStorage, ReplyStorageBackend, }, - topology_control::geo_aware_provider::{CountryGroup, GeoAwareTopologyProvider}, }, - config::{GatewayEndpointConfig, GroupBy}, + config::GatewayEndpointConfig, }; pub use nym_credential_storage::{ ephemeral_storage::EphemeralStorage as EphemeralCredentialStorage, models::CoconutCredential, @@ -73,6 +72,9 @@ pub use nym_sphinx::{ receiver::ReconstructedMessage, }; pub use nym_topology::{provider_trait::TopologyProvider, NymTopology}; +pub use nym_topology_control::geo_aware_provider::{ + CountryGroup, GeoAwareTopologyProvider, GroupBy, +}; pub use paths::StoragePaths; pub use socks5_client::Socks5MixnetClient; pub use traits::MixnetMessageSender; From 40979324df62731bc100a08bb01e00f28076d8f5 Mon Sep 17 00:00:00 2001 From: Simon Wicky Date: Tue, 6 Feb 2024 10:44:27 +0100 Subject: [PATCH 2/9] apply octlol suggestion --- Cargo.lock | 2 +- common/topology-control/Cargo.toml | 8 ++++---- 2 files changed, 5 insertions(+), 5 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index e3d04138478..29e151126bd 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -7488,7 +7488,7 @@ dependencies = [ [[package]] name = "nym-topology-control" -version = "1.1.15" +version = "0.1.0" dependencies = [ "async-trait", "futures", diff --git a/common/topology-control/Cargo.toml b/common/topology-control/Cargo.toml index 016c21b7488..06925acf2c8 100644 --- a/common/topology-control/Cargo.toml +++ b/common/topology-control/Cargo.toml @@ -1,10 +1,10 @@ [package] name = "nym-topology-control" -version = "1.1.15" -authors = ["Dave Hrycyszyn "] -edition = "2021" -rust-version = "1.66" +version = "0.1.0" +edition.workspace = true +authors.workspace = true license.workspace = true +repository.workspace = true # See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html From 065f872509823dd4f58f4fce168cf5ac1b1463e2 Mon Sep 17 00:00:00 2001 From: Simon Wicky Date: Tue, 6 Feb 2024 11:24:12 +0100 Subject: [PATCH 3/9] changes on nym-connect and wasm client --- Cargo.lock | 2 ++ common/topology-control/Cargo.toml | 8 +++++ nym-connect/desktop/Cargo.lock | 37 ++++++++++++++++++++++ nym-connect/desktop/src-tauri/Cargo.toml | 1 + nym-connect/desktop/src-tauri/src/tasks.rs | 3 +- 5 files changed, 50 insertions(+), 1 deletion(-) diff --git a/Cargo.lock b/Cargo.lock index 29e151126bd..6967680d078 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -7492,6 +7492,7 @@ version = "0.1.0" dependencies = [ "async-trait", "futures", + "gloo-timers", "log", "nym-explorer-client", "nym-network-defaults", @@ -7505,6 +7506,7 @@ dependencies = [ "tokio", "tokio-stream", "url", + "wasmtimer", ] [[package]] diff --git a/common/topology-control/Cargo.toml b/common/topology-control/Cargo.toml index 06925acf2c8..e4ea9a32e7e 100644 --- a/common/topology-control/Cargo.toml +++ b/common/topology-control/Cargo.toml @@ -32,3 +32,11 @@ nym-network-defaults = { path = "../network-defaults" } version = "0.1.11" features = ["time"] + +[target."cfg(target_arch = \"wasm32\")".dependencies.wasmtimer] +workspace = true +features = ["tokio"] + +[target."cfg(target_arch = \"wasm32\")".dependencies.gloo-timers] +version = "0.2.4" +features = ["futures"] \ No newline at end of file diff --git a/nym-connect/desktop/Cargo.lock b/nym-connect/desktop/Cargo.lock index 6de58c09933..842526b316c 100644 --- a/nym-connect/desktop/Cargo.lock +++ b/nym-connect/desktop/Cargo.lock @@ -958,6 +958,12 @@ version = "0.9.5" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "28c122c3980598d243d63d9a704629a2d748d101f278052ff068be5a4423ab6f" +[[package]] +name = "const-str" +version = "0.5.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "aca749d3d3f5b87a0d6100509879f9cf486ab510803a4a4e1001da1ff61c2bd6" + [[package]] name = "constant_time_eq" version = "0.3.0" @@ -3722,6 +3728,7 @@ dependencies = [ "clap", "clap_complete", "clap_complete_fig", + "const-str", "log", "pretty_env_logger", "schemars", @@ -3756,12 +3763,14 @@ dependencies = [ "nym-sphinx", "nym-task", "nym-topology", + "nym-topology-control", "nym-validator-client", "rand 0.7.3", "reqwest", "serde", "serde_json", "sha2 0.10.8", + "si-scale", "sqlx", "tap", "thiserror", @@ -3867,6 +3876,7 @@ dependencies = [ "nym-sphinx", "nym-task", "nym-topology", + "nym-topology-control", "nym-validator-client", "pretty_env_logger", "rand 0.7.3", @@ -4466,6 +4476,27 @@ dependencies = [ "thiserror", ] +[[package]] +name = "nym-topology-control" +version = "0.1.0" +dependencies = [ + "async-trait", + "futures", + "log", + "nym-explorer-client", + "nym-network-defaults", + "nym-sphinx", + "nym-task", + "nym-topology", + "nym-validator-client", + "rand 0.7.3", + "serde", + "tap", + "tokio", + "tokio-stream", + "url", +] + [[package]] name = "nym-validator-client" version = "0.1.0" @@ -6219,6 +6250,12 @@ dependencies = [ "lazy_static", ] +[[package]] +name = "si-scale" +version = "0.2.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "44beb68bf488343b13ddbd74d1d5d5e6559a58b6dfaee74eb8d5ed4f7ed7666f" + [[package]] name = "signal-hook" version = "0.3.17" diff --git a/nym-connect/desktop/src-tauri/Cargo.toml b/nym-connect/desktop/src-tauri/Cargo.toml index 193d3a53a3d..3aa3dcc907d 100644 --- a/nym-connect/desktop/src-tauri/Cargo.toml +++ b/nym-connect/desktop/src-tauri/Cargo.toml @@ -61,6 +61,7 @@ nym-socks5-client-core = { path = "../../../common/socks5-client-core" } nym-sphinx = { path = "../../../common/nymsphinx" } nym-task = { path = "../../../common/task" } nym-topology = { path = "../../../common/topology" } +nym-topology-control = { path = "../../../common/topology-control" } nym-validator-client = { path = "../../../common/client-libs/validator-client" } [dev-dependencies] diff --git a/nym-connect/desktop/src-tauri/src/tasks.rs b/nym-connect/desktop/src-tauri/src/tasks.rs index d4264f22740..4ada6839463 100644 --- a/nym-connect/desktop/src-tauri/src/tasks.rs +++ b/nym-connect/desktop/src-tauri/src/tasks.rs @@ -4,9 +4,10 @@ use nym_client_core::{ client::base_client::storage::{ gateway_details::GatewayDetailsStore, MixnetClientStorage, OnDiskPersistent, }, - config::{GroupBy, TopologyStructure}, + config::TopologyStructure, error::ClientCoreStatusMessage, }; +use nym_topology_control::geo_aware_provider::GroupBy; use nym_socks5_client_core::{NymClient as Socks5NymClient, Socks5ControlMessageSender}; use nym_sphinx::params::PacketSize; use nym_task::manager::TaskStatus; From ac2c6e78fae3a79669be0b4ed312327cab5416fb Mon Sep 17 00:00:00 2001 From: Simon Wicky Date: Tue, 6 Feb 2024 13:45:43 +0100 Subject: [PATCH 4/9] cargo fmt --- nym-connect/desktop/src-tauri/src/tasks.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/nym-connect/desktop/src-tauri/src/tasks.rs b/nym-connect/desktop/src-tauri/src/tasks.rs index 4ada6839463..e76e9bc32f4 100644 --- a/nym-connect/desktop/src-tauri/src/tasks.rs +++ b/nym-connect/desktop/src-tauri/src/tasks.rs @@ -7,10 +7,10 @@ use nym_client_core::{ config::TopologyStructure, error::ClientCoreStatusMessage, }; -use nym_topology_control::geo_aware_provider::GroupBy; use nym_socks5_client_core::{NymClient as Socks5NymClient, Socks5ControlMessageSender}; use nym_sphinx::params::PacketSize; use nym_task::manager::TaskStatus; +use nym_topology_control::geo_aware_provider::GroupBy; use std::sync::Arc; use tap::TapFallible; use tokio::sync::RwLock; From 5db4e805f80ea127469a229f6598167daaa27ac6 Mon Sep 17 00:00:00 2001 From: Simon Wicky Date: Tue, 30 Jan 2024 10:19:05 +0100 Subject: [PATCH 5/9] add main noise logic --- common/nymnoise/Cargo.toml | 21 +++ common/nymnoise/src/connection.rs | 72 ++++++++++ common/nymnoise/src/error.rs | 42 ++++++ common/nymnoise/src/lib.rs | 167 +++++++++++++++++++++++ common/nymnoise/src/stream.rs | 220 ++++++++++++++++++++++++++++++ 5 files changed, 522 insertions(+) create mode 100644 common/nymnoise/Cargo.toml create mode 100644 common/nymnoise/src/connection.rs create mode 100644 common/nymnoise/src/error.rs create mode 100644 common/nymnoise/src/lib.rs create mode 100644 common/nymnoise/src/stream.rs diff --git a/common/nymnoise/Cargo.toml b/common/nymnoise/Cargo.toml new file mode 100644 index 00000000000..5738e22c3e2 --- /dev/null +++ b/common/nymnoise/Cargo.toml @@ -0,0 +1,21 @@ +[package] +name = "nym-noise" +version = "0.1.0" +authors = ["Simon Wicky "] +edition = "2021" + +[dependencies] +snow = "0.9.2" +futures = "0.3" +tokio = { version = "1.24.1", features = ["net","io-util","time"] } +tokio-util = { workspace = true, features = ["codec"] } +pin-project = "1" +log = "0.4.19" +sha2 = "0.10.7" +bytes = "1.0" +thiserror = "1.0.44" +semver = "0.11" + +# internal +nym-topology = { path = "../topology"} +nym-crypto = { path = "../crypto" } \ No newline at end of file diff --git a/common/nymnoise/src/connection.rs b/common/nymnoise/src/connection.rs new file mode 100644 index 00000000000..bdad0cdfefd --- /dev/null +++ b/common/nymnoise/src/connection.rs @@ -0,0 +1,72 @@ +// Copyright 2024 - Nym Technologies SA +// SPDX-License-Identifier: Apache-2.0 + +use std::io; + +use pin_project::pin_project; +use tokio::{ + io::{AsyncRead, AsyncWrite}, + net::TcpStream, +}; + +use crate::stream::NoiseStream; + +#[pin_project(project = ConnectionProj)] +pub enum Connection { + Tcp(#[pin] TcpStream), + Noise(#[pin] NoiseStream), +} + +impl Connection { + pub fn peer_addr(&self) -> Result { + match self { + Self::Noise(stream) => stream.peer_addr(), + Self::Tcp(stream) => stream.peer_addr(), + } + } +} + +impl AsyncRead for Connection { + fn poll_read( + self: std::pin::Pin<&mut Self>, + cx: &mut std::task::Context<'_>, + buf: &mut tokio::io::ReadBuf<'_>, + ) -> std::task::Poll> { + match self.project() { + ConnectionProj::Noise(stream) => stream.poll_read(cx, buf), + ConnectionProj::Tcp(stream) => stream.poll_read(cx, buf), + } + } +} + +impl AsyncWrite for Connection { + fn poll_write( + self: std::pin::Pin<&mut Self>, + cx: &mut std::task::Context<'_>, + buf: &[u8], + ) -> std::task::Poll> { + match self.project() { + ConnectionProj::Noise(stream) => stream.poll_write(cx, buf), + ConnectionProj::Tcp(stream) => stream.poll_write(cx, buf), + } + } + fn poll_flush( + self: std::pin::Pin<&mut Self>, + cx: &mut std::task::Context<'_>, + ) -> std::task::Poll> { + match self.project() { + ConnectionProj::Noise(stream) => stream.poll_flush(cx), + ConnectionProj::Tcp(stream) => stream.poll_flush(cx), + } + } + + fn poll_shutdown( + self: std::pin::Pin<&mut Self>, + cx: &mut std::task::Context<'_>, + ) -> std::task::Poll> { + match self.project() { + ConnectionProj::Noise(stream) => stream.poll_shutdown(cx), + ConnectionProj::Tcp(stream) => stream.poll_shutdown(cx), + } + } +} diff --git a/common/nymnoise/src/error.rs b/common/nymnoise/src/error.rs new file mode 100644 index 00000000000..208c5b8ef5b --- /dev/null +++ b/common/nymnoise/src/error.rs @@ -0,0 +1,42 @@ +// Copyright 2023 - Nym Technologies SA +// SPDX-License-Identifier: Apache-2.0 + +use snow::Error; +use std::io; +use std::num::TryFromIntError; +use thiserror::Error; + +#[derive(Error, Debug)] +pub enum NoiseError { + #[error("encountered a Noise decryption error")] + DecryptionError, + + #[error("encountered a Noise Protocol error - {0}")] + ProtocolError(Error), + #[error("encountered an IO error - {0}")] + IoError(#[from] io::Error), + + #[error("Incorrect state")] + IncorrectStateError, + + #[error("Handshake timeout")] + HandshakeTimeoutError(#[from] tokio::time::error::Elapsed), + + #[error("Handshake did not complete")] + HandshakeError, + + #[error(transparent)] + IntConversionError(#[from] TryFromIntError), + + #[error("unable to extract public key - {0}")] + EncryptionKeyConversionError(#[from] nym_crypto::asymmetric::encryption::KeyRecoveryError), +} + +impl From for NoiseError { + fn from(err: Error) -> Self { + match err { + Error::Decrypt => NoiseError::DecryptionError, + err => NoiseError::ProtocolError(err), + } + } +} diff --git a/common/nymnoise/src/lib.rs b/common/nymnoise/src/lib.rs new file mode 100644 index 00000000000..18d53ebc4b6 --- /dev/null +++ b/common/nymnoise/src/lib.rs @@ -0,0 +1,167 @@ +// Copyright 2023 - Nym Technologies SA +// SPDX-License-Identifier: Apache-2.0 + +use crate::connection::Connection; +use crate::error::NoiseError; +use crate::stream::{NoisePattern, NoiseStream}; +use log::*; +use nym_crypto::asymmetric::encryption; +use nym_topology::NymTopology; +use sha2::{Digest, Sha256}; +use snow::{error::Prerequisite, Builder, Error}; +use tokio::net::TcpStream; + +pub mod connection; +pub mod error; +pub mod stream; + +pub async fn upgrade_noise_initiator( + conn: TcpStream, + pattern: NoisePattern, + local_public_key: Option<&[u8]>, + local_private_key: &[u8], + remote_pub_key: &[u8], + epoch: u32, +) -> Result { + trace!("Perform Noise Handshake, initiator side"); + + //In case the local key cannot be known by the remote party, e.g. in a client-gateway connection + let secret = [ + local_public_key.unwrap_or(&[]), + remote_pub_key, + &epoch.to_be_bytes(), + ] + .concat(); + let secret_hash = Sha256::digest(secret); + + let handshake = Builder::new(pattern.as_str().parse()?) + .local_private_key(local_private_key) + .remote_public_key(remote_pub_key) + .psk(pattern.psk_position(), &secret_hash) + .build_initiator()?; + + let noise_stream = NoiseStream::new(conn, handshake); + + Ok(Connection::Noise(noise_stream.perform_handshake().await?)) +} + +pub async fn upgrade_noise_initiator_with_topology( + conn: TcpStream, + pattern: NoisePattern, + topology: &NymTopology, + epoch: u32, + local_public_key: &[u8], + local_private_key: &[u8], +) -> Result { + //Get init material + let responder_addr = match conn.peer_addr() { + Ok(addr) => addr, + Err(err) => { + error!("Unable to extract peer address from connection - {err}"); + return Err(Error::Prereq(Prerequisite::RemotePublicKey).into()); + } + }; + let remote_pub_key = match topology.find_node_key_by_mix_host(responder_addr) { + Ok(Some(key)) => encryption::PublicKey::from_base58_string(key)?.to_bytes(), + Ok(None) => { + warn!( + "{:?} can't speak Noise yet, falling back to TCP", + responder_addr + ); + return Ok(Connection::Tcp(conn)); + } + Err(_) => { + error!( + "Cannot find public key for node with address {:?}", + responder_addr + ); //Do we still pursue a TCP connection or not? + return Err(Error::Prereq(Prerequisite::RemotePublicKey).into()); + } + }; + + upgrade_noise_initiator( + conn, + pattern, + Some(local_public_key), + local_private_key, + &remote_pub_key, + epoch, + ) + .await +} + +pub async fn upgrade_noise_responder( + conn: TcpStream, + pattern: NoisePattern, + local_public_key: &[u8], + local_private_key: &[u8], + remote_pub_key: Option<&[u8]>, + epoch: u32, +) -> Result { + trace!("Perform Noise Handshake, responder side"); + + //If the remote_key cannot be kwnown, e.g. in a client-gateway connection + let secret = [ + remote_pub_key.unwrap_or(&[]), + local_public_key, + &epoch.to_be_bytes(), + ] + .concat(); + let secret_hash = Sha256::digest(secret); + + let handshake = Builder::new(pattern.as_str().parse()?) + .local_private_key(local_private_key) + .psk(pattern.psk_position(), &secret_hash) + .build_responder()?; + + let noise_stream = NoiseStream::new(conn, handshake); + + Ok(Connection::Noise(noise_stream.perform_handshake().await?)) +} + +pub async fn upgrade_noise_responder_with_topology( + conn: TcpStream, + pattern: NoisePattern, + topology: &NymTopology, + epoch: u32, + local_public_key: &[u8], + local_private_key: &[u8], +) -> Result { + //Get init material + let initiator_addr = match conn.peer_addr() { + Ok(addr) => addr, + Err(err) => { + error!("Unable to extract peer address from connection - {err}"); + return Err(Error::Prereq(Prerequisite::RemotePublicKey).into()); + } + }; + + //SW : for private gateway, we could try to perform the handshake without that key? + let remote_pub_key = match topology.find_node_key_by_mix_host(initiator_addr) { + Ok(Some(key)) => encryption::PublicKey::from_base58_string(key)?.to_bytes(), + Ok(None) => { + warn!( + "{:?} can't speak Noise yet, falling back to TCP", + initiator_addr + ); + return Ok(Connection::Tcp(conn)); + } + Err(_) => { + error!( + "Cannot find public key for node with address {:?}", + initiator_addr + ); //Do we still pursue a TCP connection with that node or not? + return Err(Error::Prereq(Prerequisite::RemotePublicKey).into()); + } + }; + + upgrade_noise_responder( + conn, + pattern, + local_public_key, + local_private_key, + Some(&remote_pub_key), + epoch, + ) + .await +} diff --git a/common/nymnoise/src/stream.rs b/common/nymnoise/src/stream.rs new file mode 100644 index 00000000000..01063527de6 --- /dev/null +++ b/common/nymnoise/src/stream.rs @@ -0,0 +1,220 @@ +// Copyright 2023 - Nym Technologies SA +// SPDX-License-Identifier: Apache-2.0 + +use crate::error::NoiseError; +use bytes::BytesMut; +use futures::{Sink, SinkExt, Stream, StreamExt}; +use pin_project::pin_project; +use snow::{HandshakeState, TransportState}; +use std::cmp::min; +use std::collections::VecDeque; +use std::io; +use std::pin::Pin; +use std::task::Poll; +use tokio::{ + io::{AsyncRead, AsyncWrite, ReadBuf}, + net::TcpStream, +}; +use tokio_util::codec::{Framed, LengthDelimitedCodec}; + +const MAXMSGLEN: usize = 65535; +const TAGLEN: usize = 16; + +#[derive(Default)] +pub enum NoisePattern { + #[default] + XKpsk3, + IKpsk2, +} + +impl NoisePattern { + pub(crate) fn as_str(&self) -> &'static str { + match self { + Self::XKpsk3 => "Noise_XKpsk3_25519_AESGCM_SHA256", + Self::IKpsk2 => "Noise_IKpsk2_25519_ChaChaPoly_BLAKE2s", //Wireguard handshake (not exactly though) + } + } + + pub(crate) fn psk_position(&self) -> u8 { + //automatic parsing, works for correct pattern, more convenient + match self.as_str().find("psk") { + Some(n) => { + let psk_index = n + 3; + let psk_char = self.as_str().chars().nth(psk_index).unwrap(); + psk_char.to_string().parse().unwrap() + //if this fails, it means hardcoded pattern are wrong + } + None => 0, + } + } +} + +/// Wrapper around a TcpStream +#[pin_project] +pub struct NoiseStream { + #[pin] + inner_stream: Framed, + handshake: Option, + noise: Option, + dec_buffer: VecDeque, +} + +impl NoiseStream { + pub(crate) fn new(inner_stream: TcpStream, handshake: HandshakeState) -> NoiseStream { + NoiseStream { + inner_stream: LengthDelimitedCodec::builder() + .length_field_type::() + .new_framed(inner_stream), + handshake: Some(handshake), + noise: None, + dec_buffer: VecDeque::with_capacity(MAXMSGLEN), + } + } + + pub(crate) async fn perform_handshake(mut self) -> Result { + //Check if we are in the correct state + let Some(mut handshake) = self.handshake else { + return Err(NoiseError::IncorrectStateError); + }; + self.handshake = None; + + while !handshake.is_handshake_finished() { + if handshake.is_my_turn() { + self.send_handshake_msg(&mut handshake).await?; + } else { + self.recv_handshake_msg(&mut handshake).await?; + } + } + + self.noise = Some(handshake.into_transport_mode()?); + Ok(self) + } + + async fn send_handshake_msg( + &mut self, + handshake: &mut HandshakeState, + ) -> Result<(), NoiseError> { + let mut buf = BytesMut::zeroed(MAXMSGLEN + TAGLEN); + let len = handshake.write_message(&[], &mut buf)?; + buf.truncate(len); + self.inner_stream.send(buf.into()).await?; + Ok(()) + } + + async fn recv_handshake_msg( + &mut self, + handshake: &mut HandshakeState, + ) -> Result<(), NoiseError> { + match self.inner_stream.next().await { + Some(Ok(msg)) => { + let mut buf = vec![0u8; MAXMSGLEN]; + handshake.read_message(&msg, &mut buf)?; + Ok(()) + } + Some(Err(err)) => Err(NoiseError::IoError(err)), + None => Err(NoiseError::HandshakeError), + } + } + + pub fn peer_addr(&self) -> Result { + self.inner_stream.get_ref().peer_addr() + } +} + +impl AsyncRead for NoiseStream { + fn poll_read( + self: Pin<&mut Self>, + cx: &mut std::task::Context<'_>, + buf: &mut ReadBuf<'_>, + ) -> Poll> { + let projected_self = self.project(); + + match projected_self.inner_stream.poll_next(cx) { + Poll::Pending => { + //no new data, waking is already scheduled. + //Nothing new to decrypt, only check if we can return something from dec_storage, happens after + } + + Poll::Ready(Some(Ok(noise_msg))) => { + //We have a new moise msg + let mut dec_msg = vec![0u8; MAXMSGLEN]; + let len = match projected_self.noise { + Some(transport_state) => { + match transport_state.read_message(&noise_msg, &mut dec_msg) { + Ok(len) => len, + Err(_) => return Poll::Ready(Err(io::ErrorKind::InvalidInput.into())), + } + } + None => return Poll::Ready(Err(io::ErrorKind::Other.into())), + }; + projected_self.dec_buffer.extend(&dec_msg[..len]); + } + + Poll::Ready(Some(Err(err))) => return Poll::Ready(Err(err)), + + //Stream is done, return Ok with nothing in buf + Poll::Ready(None) => return Poll::Ready(Ok(())), + } + + //check and return what we can + let read_len = min(buf.remaining(), projected_self.dec_buffer.len()); + if read_len > 0 { + buf.put_slice( + &projected_self + .dec_buffer + .drain(..read_len) + .collect::>(), + ); + return Poll::Ready(Ok(())); + } + + //If we end up here, it must mean the previous poll_next was pending as well, otherwise something was returned. Hence waking is already scheduled + Poll::Pending + } +} + +impl AsyncWrite for NoiseStream { + fn poll_write( + self: Pin<&mut Self>, + cx: &mut std::task::Context<'_>, + buf: &[u8], + ) -> Poll> { + let mut projected_self = self.project(); + + match projected_self.inner_stream.as_mut().poll_ready(cx) { + Poll::Pending => Poll::Pending, + + Poll::Ready(Err(err)) => Poll::Ready(Err(err)), + + Poll::Ready(Ok(())) => { + let mut noise_buf = BytesMut::zeroed(MAXMSGLEN + TAGLEN); + + let Ok(len) = (match projected_self.noise { + Some(transport_state) => transport_state.write_message(buf, &mut noise_buf), + None => return Poll::Ready(Err(io::ErrorKind::Other.into())), + }) else { + return Poll::Ready(Err(io::ErrorKind::InvalidInput.into())); + }; + noise_buf.truncate(len); + match projected_self.inner_stream.start_send(noise_buf.into()) { + Ok(()) => Poll::Ready(Ok(buf.len())), + Err(e) => Poll::Ready(Err(e)), + } + } + } + } + + fn poll_flush( + self: Pin<&mut Self>, + cx: &mut std::task::Context<'_>, + ) -> Poll> { + self.project().inner_stream.poll_flush(cx) + } + + fn poll_shutdown( + self: Pin<&mut Self>, + cx: &mut std::task::Context<'_>, + ) -> Poll> { + self.project().inner_stream.poll_close(cx) + } +} From b5fceaafda0ae7b5d2baab11c595b0b0a2120e62 Mon Sep 17 00:00:00 2001 From: Simon Wicky Date: Tue, 30 Jan 2024 10:19:28 +0100 Subject: [PATCH 6/9] add noise connection in nodes --- Cargo.lock | 46 +++++++--- common/client-libs/mixnet-client/Cargo.toml | 8 ++ .../client-libs/mixnet-client/src/client.rs | 85 +++++++++++++++++-- .../mixnet-client/src/forwarder.rs | 29 +++---- common/topology/src/lib.rs | 31 +++++++ gateway/Cargo.toml | 1 + .../receiver/connection_handler.rs | 53 +++++++++++- gateway/src/node/mod.rs | 32 ++++++- mixnode/Cargo.toml | 1 + .../node/listener/connection_handler/mod.rs | 56 ++++++++++-- mixnode/src/node/mod.rs | 28 +++++- 11 files changed, 324 insertions(+), 46 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 6967680d078..1690e2bfc67 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -6559,6 +6559,7 @@ dependencies = [ "nym-network-defaults", "nym-network-requester", "nym-node", + "nym-noise", "nym-pemstore", "nym-sphinx", "nym-statistics-common", @@ -6710,10 +6711,16 @@ version = "0.1.0" dependencies = [ "futures", "log", + "nym-client-core", + "nym-crypto", + "nym-noise", "nym-sphinx", "nym-task", + "nym-validator-client", + "rand 0.7.3", "tokio", "tokio-util", + "url", ] [[package]] @@ -6761,6 +6768,7 @@ dependencies = [ "nym-mixnet-client", "nym-mixnode-common", "nym-node", + "nym-noise", "nym-nonexhaustive-delayqueue", "nym-pemstore", "nym-sphinx", @@ -7015,6 +7023,24 @@ dependencies = [ "wasmtimer", ] +[[package]] +name = "nym-noise" +version = "0.1.0" +dependencies = [ + "bytes", + "futures", + "log", + "nym-crypto", + "nym-topology", + "pin-project", + "semver 0.11.0", + "sha2 0.10.8", + "snow", + "thiserror", + "tokio", + "tokio-util", +] + [[package]] name = "nym-nonexhaustive-delayqueue" version = "0.1.0" @@ -7819,9 +7845,9 @@ checksum = "624a8340c38c1b80fd549087862da4ba43e08858af025b236e509b6649fc13d5" [[package]] name = "openssl" -version = "0.10.62" +version = "0.10.57" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "8cde4d2d9200ad5909f8dac647e29482e07c3a35de8a13fce7c9c7747ad9f671" +checksum = "bac25ee399abb46215765b1cb35bc0212377e58a061560d8b29b024fd0430e7c" dependencies = [ "bitflags 2.4.1", "cfg-if", @@ -7860,9 +7886,9 @@ dependencies = [ [[package]] name = "openssl-sys" -version = "0.9.98" +version = "0.9.93" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "c1665caf8ab2dc9aef43d1c0023bd904633a6a05cb30b0ad59bec2ae986e57a7" +checksum = "db4d56a4c0478783083cfafcc42493dd4a981d41669da64b4572a2a089b51b1d" dependencies = [ "cc", "libc", @@ -8617,9 +8643,9 @@ dependencies = [ [[package]] name = "psl" -version = "2.1.14" +version = "2.1.15" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "383703acfc34f7a00724846c14dc5ea4407c59e5aedcbbb18a1c0c1a23fe5013" +checksum = "fa35143bed048dcb22457ef82f8ba3008b842e9158e2cfcc904f5a4e2571cd4c" dependencies = [ "psl-types", ] @@ -9978,9 +10004,9 @@ dependencies = [ [[package]] name = "serde_with" -version = "3.4.0" +version = "3.5.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "64cd236ccc1b7a29e7e2739f27c0b2dd199804abc4290e32f59f3b68d6405c23" +checksum = "f5c9fdb6b00a489875b22efd4b78fe2b363b72265cc5f6eb2e2b9ee270e6140c" dependencies = [ "base64 0.21.4", "chrono", @@ -9995,9 +10021,9 @@ dependencies = [ [[package]] name = "serde_with_macros" -version = "3.4.0" +version = "3.5.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "93634eb5f75a2323b16de4748022ac4297f9e76b6dced2be287a099f41b5e788" +checksum = "dbff351eb4b33600a2e138dfa0b10b65a238ea8ff8fb2387c422c5022a3e8298" dependencies = [ "darling 0.20.3", "proc-macro2", diff --git a/common/client-libs/mixnet-client/Cargo.toml b/common/client-libs/mixnet-client/Cargo.toml index 68e048d1f0b..505931fd736 100644 --- a/common/client-libs/mixnet-client/Cargo.toml +++ b/common/client-libs/mixnet-client/Cargo.toml @@ -16,3 +16,11 @@ tokio-util = { workspace = true, features = ["codec"] } # internal nym-sphinx = { path = "../../nymsphinx" } nym-task = { path = "../../task" } +nym-client-core = { path = "../../client-core" } +nym-noise = { path = "../../nymnoise"} +nym-crypto = { path = "../../crypto" } +nym-validator-client = { path = "../validator-client"} + +[dev-dependencies] +url = { workspace = true } +rand = "0.7.3" diff --git a/common/client-libs/mixnet-client/src/client.rs b/common/client-libs/mixnet-client/src/client.rs index 4da308144f1..b0f64a96ff6 100644 --- a/common/client-libs/mixnet-client/src/client.rs +++ b/common/client-libs/mixnet-client/src/client.rs @@ -4,11 +4,15 @@ use futures::channel::mpsc; use futures::StreamExt; use log::*; +use nym_client_core::client::topology_control::accessor::TopologyAccessor; +use nym_crypto::asymmetric::encryption; +use nym_noise::upgrade_noise_initiator_with_topology; use nym_sphinx::addressing::nodes::NymNodeRoutingAddress; use nym_sphinx::framing::codec::NymCodec; use nym_sphinx::framing::packet::FramedNymPacket; use nym_sphinx::params::PacketType; use nym_sphinx::NymPacket; +use nym_validator_client::NymApiClient; use std::collections::HashMap; use std::io; use std::net::SocketAddr; @@ -59,6 +63,9 @@ pub trait SendWithoutResponse { pub struct Client { conn_new: HashMap, config: Config, + topology_access: TopologyAccessor, + api_client: NymApiClient, + local_identity: Arc, } struct ConnectionSender { @@ -76,10 +83,18 @@ impl ConnectionSender { } impl Client { - pub fn new(config: Config) -> Client { + pub fn new( + config: Config, + topology_access: TopologyAccessor, + api_client: NymApiClient, + local_identity: Arc, + ) -> Client { Client { conn_new: HashMap::new(), config, + topology_access, + api_client, + local_identity, } } @@ -88,6 +103,9 @@ impl Client { receiver: mpsc::Receiver, connection_timeout: Duration, current_reconnection: &AtomicU32, + topology_access: TopologyAccessor, + api_client: NymApiClient, + local_identity: Arc, ) { let connection_fut = TcpStream::connect(address); @@ -97,7 +115,41 @@ impl Client { debug!("Managed to establish connection to {}", address); // if we managed to connect, reset the reconnection count (whatever it might have been) current_reconnection.store(0, Ordering::Release); - Framed::new(stream, NymCodec) + //Get the topology, because we need the keys for the handshake + let topology_ref = match topology_access.current_topology().await { + Some(topology) => topology, + None => { + error!("Cannot perform Noise handshake to {address}, due to topology error"); + return; + } + }; + + let epoch_id = match api_client.get_current_epoch_id().await { + Ok(id) => id, + Err(err) => { + error!("Cannot perform Noise handshake to {address}, due to epoch id error - {err}"); + return; + } + }; + + let noise_stream = match upgrade_noise_initiator_with_topology( + stream, + Default::default(), + &topology_ref, + epoch_id, + &local_identity.public_key().to_bytes(), + &local_identity.private_key().to_bytes(), + ) + .await + { + Ok(noise_stream) => noise_stream, + Err(err) => { + error!("Failed to perform Noise handshake with {address} - {err}"); + return; + } + }; + debug!("Noise initiator handshake completed for {:?}", address); + Framed::new(noise_stream, NymCodec) } Err(err) => { debug!( @@ -175,6 +227,10 @@ impl Client { // copy the value before moving into another task let initial_connection_timeout = self.config.initial_connection_timeout; + let topology_access_clone = self.topology_access.clone(); + let api_client_clone = self.api_client.clone(); + let local_id_key = self.local_identity.clone(); + tokio::spawn(async move { // before executing the manager, wait for what was specified, if anything if let Some(backoff) = backoff { @@ -187,6 +243,9 @@ impl Client { receiver, initial_connection_timeout, ¤t_reconnection_attempt, + topology_access_clone, + api_client_clone, + local_id_key, ) .await }); @@ -253,15 +312,23 @@ impl SendWithoutResponse for Client { #[cfg(test)] mod tests { use super::*; + use rand::rngs::OsRng; + use url::Url; fn dummy_client() -> Client { - Client::new(Config { - initial_reconnection_backoff: Duration::from_millis(10_000), - maximum_reconnection_backoff: Duration::from_millis(300_000), - initial_connection_timeout: Duration::from_millis(1_500), - maximum_connection_buffer_size: 128, - use_legacy_version: false, - }) + let mut rng = OsRng; + Client::new( + Config { + initial_reconnection_backoff: Duration::from_millis(10_000), + maximum_reconnection_backoff: Duration::from_millis(300_000), + initial_connection_timeout: Duration::from_millis(1_500), + maximum_connection_buffer_size: 128, + use_legacy_version: false, + }, + TopologyAccessor::new(), + NymApiClient::new(Url::parse("http://dummy.url").unwrap()), + Arc::new(encryption::KeyPair::new(&mut rng)), + ) } #[test] diff --git a/common/client-libs/mixnet-client/src/forwarder.rs b/common/client-libs/mixnet-client/src/forwarder.rs index 630cc956632..f3357d54940 100644 --- a/common/client-libs/mixnet-client/src/forwarder.rs +++ b/common/client-libs/mixnet-client/src/forwarder.rs @@ -5,8 +5,11 @@ use crate::client::{Client, Config, SendWithoutResponse}; use futures::channel::mpsc; use futures::StreamExt; use log::*; +use nym_client_core::client::topology_control::accessor::TopologyAccessor; +use nym_crypto::asymmetric::encryption; use nym_sphinx::forwarding::packet::MixPacket; -use std::time::Duration; +use nym_validator_client::NymApiClient; +use std::sync::Arc; pub type MixForwardingSender = mpsc::UnboundedSender; type MixForwardingReceiver = mpsc::UnboundedReceiver; @@ -21,26 +24,22 @@ pub struct PacketForwarder { impl PacketForwarder { pub fn new( - initial_reconnection_backoff: Duration, - maximum_reconnection_backoff: Duration, - initial_connection_timeout: Duration, - maximum_connection_buffer_size: usize, - use_legacy_version: bool, + client_config: Config, + topology_access: TopologyAccessor, + api_client: NymApiClient, + local_identity: Arc, shutdown: nym_task::TaskClient, ) -> (PacketForwarder, MixForwardingSender) { - let client_config = Config::new( - initial_reconnection_backoff, - maximum_reconnection_backoff, - initial_connection_timeout, - maximum_connection_buffer_size, - use_legacy_version, - ); - let (packet_sender, packet_receiver) = mpsc::unbounded(); ( PacketForwarder { - mixnet_client: Client::new(client_config), + mixnet_client: Client::new( + client_config, + topology_access, + api_client, + local_identity, + ), packet_receiver, shutdown, }, diff --git a/common/topology/src/lib.rs b/common/topology/src/lib.rs index 36543b72434..39a54393c3c 100644 --- a/common/topology/src/lib.rs +++ b/common/topology/src/lib.rs @@ -186,6 +186,37 @@ impl NymTopology { None } + pub fn find_node_key_by_mix_host( + &self, + mix_host: SocketAddr, + ) -> Result, NymTopologyError> { + for node in self.described_nodes.iter() { + let sphinx_key = match node { + DescribedNymNode::Gateway(g) => &g.bond.gateway.sphinx_key, + DescribedNymNode::Mixnode(m) => &m.bond.mix_node.sphinx_key, + }; + if let Some(description) = match node { + DescribedNymNode::Gateway(g) => &g.self_described, + DescribedNymNode::Mixnode(m) => &m.self_described, + } { + if description + .host_information + .ip_address + .contains(&mix_host.ip()) + { + //we have our node + if description.noise_information.supported { + return Ok(Some(sphinx_key.to_string())); + } else { + return Ok(None); + } + } + } + } + //didn't find that node + Err(NymTopologyError::NoMixnodesAvailable) + } + pub fn find_gateway(&self, gateway_identity: IdentityKeyRef) -> Option<&gateway::Node> { self.gateways .iter() diff --git a/gateway/Cargo.toml b/gateway/Cargo.toml index 2e62aec499d..5b1712a68eb 100644 --- a/gateway/Cargo.toml +++ b/gateway/Cargo.toml @@ -80,6 +80,7 @@ nym-validator-client = { path = "../common/client-libs/validator-client" } nym-ip-packet-router = { path = "../service-providers/ip-packet-router" } nym-topology-control = { path = "../common/topology-control" } nym-topology = { path = "../common/topology" } +nym-noise = { path = "../common/nymnoise" } nym-wireguard = { path = "../common/wireguard", optional = true } defguard_wireguard_rs = { git = "https://github.com/neacsu/wireguard-rs.git", rev = "c2cd0c1119f699f4bc43f5e6ffd6fc242caa42ed", optional = true } diff --git a/gateway/src/node/mixnet_handling/receiver/connection_handler.rs b/gateway/src/node/mixnet_handling/receiver/connection_handler.rs index d07dc6489eb..f14405f65f5 100644 --- a/gateway/src/node/mixnet_handling/receiver/connection_handler.rs +++ b/gateway/src/node/mixnet_handling/receiver/connection_handler.rs @@ -9,15 +9,20 @@ use crate::node::storage::Storage; use futures::channel::mpsc::SendError; use futures::StreamExt; use log::*; +use nym_client_core::client::topology_control::accessor::TopologyAccessor; +use nym_crypto::asymmetric::encryption; use nym_mixnet_client::forwarder::MixForwardingSender; use nym_mixnode_common::packet_processor::processor::ProcessedFinalHop; +use nym_noise::upgrade_noise_responder_with_topology; use nym_sphinx::forwarding::packet::MixPacket; use nym_sphinx::framing::codec::NymCodec; use nym_sphinx::framing::packet::FramedNymPacket; use nym_sphinx::DestinationAddressBytes; use nym_task::TaskClient; +use nym_validator_client::NymApiClient; use std::collections::HashMap; use std::net::SocketAddr; +use std::sync::Arc; use thiserror::Error; use tokio::net::TcpStream; use tokio_util::codec::Framed; @@ -40,6 +45,9 @@ pub(crate) struct ConnectionHandler { active_clients_store: ActiveClientsStore, storage: St, ack_sender: MixForwardingSender, + topology_access: TopologyAccessor, + api_client: NymApiClient, + local_identity: Arc, } impl Clone for ConnectionHandler { @@ -58,6 +66,9 @@ impl Clone for ConnectionHandler { active_clients_store: self.active_clients_store.clone(), storage: self.storage.clone(), ack_sender: self.ack_sender.clone(), + topology_access: self.topology_access.clone(), + api_client: self.api_client.clone(), + local_identity: self.local_identity.clone(), } } } @@ -68,6 +79,9 @@ impl ConnectionHandler { storage: St, ack_sender: MixForwardingSender, active_clients_store: ActiveClientsStore, + topology_access: TopologyAccessor, + api_client: NymApiClient, + local_identity: Arc, ) -> Self { ConnectionHandler { packet_processor, @@ -75,6 +89,9 @@ impl ConnectionHandler { storage, active_clients_store, ack_sender, + topology_access, + api_client, + local_identity, } } @@ -205,7 +222,41 @@ impl ConnectionHandler { ) { debug!("Starting connection handler for {:?}", remote); shutdown.mark_as_success(); - let mut framed_conn = Framed::new(conn, NymCodec); + + let topology_ref = match self.topology_access.current_topology().await { + Some(topology) => topology, + None => { + error!("Cannot perform Noise handshake to {remote}, due to topology error"); + return; + } + }; + + let epoch_id = match self.api_client.get_current_epoch_id().await { + Ok(id) => id, + Err(err) => { + error!("Cannot perform Noise handshake to {remote}, due to epoch id error - {err}"); + return; + } + }; + + let noise_stream = match upgrade_noise_responder_with_topology( + conn, + Default::default(), + &topology_ref, + epoch_id, + &self.local_identity.public_key().to_bytes(), + &self.local_identity.private_key().to_bytes(), + ) + .await + { + Ok(noise_stream) => noise_stream, + Err(err) => { + error!("Failed to perform Noise handshake with {remote} - {err}"); + return; + } + }; + debug!("Noise responder handshake completed for {:?}", remote); + let mut framed_conn = Framed::new(noise_stream, NymCodec); while !shutdown.is_shutdown() { tokio::select! { biased; diff --git a/gateway/src/node/mod.rs b/gateway/src/node/mod.rs index b65a64800fb..54e8c3df032 100644 --- a/gateway/src/node/mod.rs +++ b/gateway/src/node/mod.rs @@ -36,6 +36,7 @@ use nym_topology_control::accessor::TopologyAccessor; use nym_topology_control::nym_api_provider::NymApiTopologyProvider; use nym_topology_control::TopologyRefresher; use nym_topology_control::TopologyRefresherConfig; +use nym_validator_client::NymApiClient; use nym_validator_client::{nyxd, DirectSigningHttpRpcNyxdClient}; use rand::seq::SliceRandom; use rand::thread_rng; @@ -182,6 +183,8 @@ impl Gateway { &self, ack_sender: MixForwardingSender, active_clients_store: ActiveClientsStore, + topology_access: TopologyAccessor, + api_client: NymApiClient, shutdown: TaskClient, ) where St: Storage + Clone + 'static, @@ -196,6 +199,9 @@ impl Gateway { self.storage.clone(), ack_sender, active_clients_store, + topology_access, + api_client, + Arc::clone(&self.sphinx_keypair), ); let listening_address = SocketAddr::new( @@ -249,15 +255,26 @@ impl Gateway { ); } - fn start_packet_forwarder(&self, shutdown: TaskClient) -> MixForwardingSender { + fn start_packet_forwarder( + &self, + topology_access: TopologyAccessor, + api_client: NymApiClient, + shutdown: TaskClient, + ) -> MixForwardingSender { info!("Starting mix packet forwarder..."); - let (mut packet_forwarder, packet_sender) = PacketForwarder::new( + let forwarder_config = nym_mixnet_client::client::Config::new( self.config.debug.packet_forwarding_initial_backoff, self.config.debug.packet_forwarding_maximum_backoff, self.config.debug.initial_connection_timeout, self.config.debug.maximum_connection_buffer_size, self.config.debug.use_legacy_framed_packet_version, + ); + let (mut packet_forwarder, packet_sender) = PacketForwarder::new( + forwarder_config, + topology_access, + api_client, + Arc::clone(&self.sphinx_keypair), shutdown, ); @@ -518,13 +535,20 @@ impl Gateway { ) .await; - let mix_forwarding_channel = - self.start_packet_forwarder(shutdown.subscribe().named("PacketForwarder")); + let random_api_client = self.random_api_client()?; + + let mix_forwarding_channel = self.start_packet_forwarder( + shared_topology_access.clone(), + random_api_client.clone(), + shutdown.subscribe().named("PacketForwarder"), + ); let active_clients_store = ActiveClientsStore::new(); self.start_mix_socket_listener( mix_forwarding_channel.clone(), active_clients_store.clone(), + shared_topology_access, + random_api_client, shutdown.subscribe().named("mixnet_handling::Listener"), ); diff --git a/mixnode/Cargo.toml b/mixnode/Cargo.toml index d7fca9ca140..e44f0194698 100644 --- a/mixnode/Cargo.toml +++ b/mixnode/Cargo.toml @@ -63,6 +63,7 @@ nym-topology-control = { path = "../common/topology-control" } nym-validator-client = { path = "../common/client-libs/validator-client" } nym-bin-common = { path = "../common/bin-common", features = ["output_format"] } cpu-cycles = { path = "../cpu-cycles", optional = true } +nym-noise = { path = "../common/nymnoise" } [dev-dependencies] tokio = { workspace = true, features = [ diff --git a/mixnode/src/node/listener/connection_handler/mod.rs b/mixnode/src/node/listener/connection_handler/mod.rs index c16a700bfd8..b1118526bcf 100644 --- a/mixnode/src/node/listener/connection_handler/mod.rs +++ b/mixnode/src/node/listener/connection_handler/mod.rs @@ -9,12 +9,17 @@ use crate::node::TaskClient; use futures::StreamExt; use log::debug; use log::{error, info, warn}; +use nym_client_core::client::topology_control::accessor::TopologyAccessor; +use nym_crypto::asymmetric::encryption; use nym_mixnode_common::measure; +use nym_noise::upgrade_noise_responder_with_topology; use nym_sphinx::forwarding::packet::MixPacket; use nym_sphinx::framing::codec::NymCodec; use nym_sphinx::framing::packet::FramedNymPacket; use nym_sphinx::Delay as SphinxDelay; +use nym_validator_client::NymApiClient; use std::net::SocketAddr; +use std::sync::Arc; use tokio::net::TcpStream; use tokio::time::Instant; use tokio_util::codec::Framed; @@ -28,16 +33,25 @@ pub(crate) mod packet_processing; pub(crate) struct ConnectionHandler { packet_processor: PacketProcessor, delay_forwarding_channel: PacketDelayForwardSender, + topology_access: TopologyAccessor, + api_client: NymApiClient, + local_identity: Arc, } impl ConnectionHandler { pub(crate) fn new( packet_processor: PacketProcessor, delay_forwarding_channel: PacketDelayForwardSender, + topology_access: TopologyAccessor, + api_client: NymApiClient, + local_identity: Arc, ) -> Self { ConnectionHandler { packet_processor, delay_forwarding_channel, + topology_access, + api_client, + local_identity, } } @@ -88,8 +102,43 @@ impl ConnectionHandler { mut shutdown: TaskClient, ) { debug!("Starting connection handler for {:?}", remote); + shutdown.mark_as_success(); - let mut framed_conn = Framed::new(conn, NymCodec); + + let topology_ref = match self.topology_access.current_topology().await { + Some(topology) => topology, + None => { + error!("Cannot perform Noise handshake to {remote}, due to topology error"); + return; + } + }; + + let epoch_id = match self.api_client.get_current_epoch_id().await { + Ok(id) => id, + Err(err) => { + error!("Cannot perform Noise handshake to {remote}, due to epoch id error - {err}"); + return; + } + }; + + let noise_stream = match upgrade_noise_responder_with_topology( + conn, + Default::default(), + &topology_ref, + epoch_id, + &self.local_identity.public_key().to_bytes(), + &self.local_identity.private_key().to_bytes(), + ) + .await + { + Ok(noise_stream) => noise_stream, + Err(err) => { + error!("Failed to perform Noise handshake with {remote} - {err}"); + return; + } + }; + debug!("Noise responder handshake completed for {:?}", remote); + let mut framed_conn = Framed::new(noise_stream, NymCodec); while !shutdown.is_shutdown() { tokio::select! { biased; @@ -121,10 +170,7 @@ impl ConnectionHandler { } } - info!( - "Closing connection from {:?}", - framed_conn.into_inner().peer_addr() - ); + info!("Closing connection from {:?}", remote); log::trace!("ConnectionHandler: Exiting"); } } diff --git a/mixnode/src/node/mod.rs b/mixnode/src/node/mod.rs index a990b98b4ad..541954c4241 100644 --- a/mixnode/src/node/mod.rs +++ b/mixnode/src/node/mod.rs @@ -23,6 +23,7 @@ use nym_topology_control::accessor::TopologyAccessor; use nym_topology_control::nym_api_provider::NymApiTopologyProvider; use nym_topology_control::TopologyRefresher; use nym_topology_control::TopologyRefresherConfig; +use nym_validator_client::NymApiClient; use rand::seq::SliceRandom; use rand::thread_rng; use std::net::SocketAddr; @@ -107,6 +108,8 @@ impl MixNode { &self, node_stats_update_sender: node_statistics::UpdateSender, delay_forwarding_channel: PacketDelayForwardSender, + topology_access: TopologyAccessor, + api_client: NymApiClient, shutdown: TaskClient, ) { info!("Starting socket listener..."); @@ -114,7 +117,13 @@ impl MixNode { let packet_processor = PacketProcessor::new(self.sphinx_keypair.private_key(), node_stats_update_sender); - let connection_handler = ConnectionHandler::new(packet_processor, delay_forwarding_channel); + let connection_handler = ConnectionHandler::new( + packet_processor, + delay_forwarding_channel, + topology_access, + api_client, + Arc::clone(&self.sphinx_keypair), + ); let listening_address = SocketAddr::new( self.config.mixnode.listening_address, @@ -127,6 +136,8 @@ impl MixNode { fn start_packet_delay_forwarder( &mut self, node_stats_update_sender: node_statistics::UpdateSender, + topology_access: TopologyAccessor, + api_client: NymApiClient, shutdown: TaskClient, ) -> PacketDelayForwardSender { info!("Starting packet delay-forwarder..."); @@ -140,7 +151,12 @@ impl MixNode { ); let mut packet_forwarder = DelayForwarder::new( - nym_mixnet_client::Client::new(client_config), + nym_mixnet_client::Client::new( + client_config, + topology_access, + api_client, + Arc::clone(&self.sphinx_keypair), + ), node_stats_update_sender, shutdown, ); @@ -288,13 +304,21 @@ impl MixNode { shutdown.subscribe().named("TopologyRefresher"), ) .await; + + let random_api_client = self.random_api_client(); + let delay_forwarding_channel = self.start_packet_delay_forwarder( node_stats_update_sender.clone(), + shared_topology_access.clone(), + random_api_client.clone(), shutdown.subscribe().named("DelayForwarder"), ); + self.start_socket_listener( node_stats_update_sender, delay_forwarding_channel, + shared_topology_access, + random_api_client, shutdown.subscribe().named("Listener"), ); let atomic_verloc_results = From b75a0614cc45d1d09e1f9d91e55a53c2929df7fc Mon Sep 17 00:00:00 2001 From: Simon Wicky Date: Tue, 30 Jan 2024 11:07:46 +0100 Subject: [PATCH 7/9] enable noise support in self described API --- gateway/src/http/mod.rs | 2 +- mixnode/src/node/http/mod.rs | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/gateway/src/http/mod.rs b/gateway/src/http/mod.rs index a7a57cfecfc..4c68e55ece8 100644 --- a/gateway/src/http/mod.rs +++ b/gateway/src/http/mod.rs @@ -261,7 +261,7 @@ impl<'a> HttpApiBuilder<'a> { self.sphinx_keypair.public_key(), self.identity_keypair, )?, - NoiseInformation { supported: false }, //this field comes with Noise support, but with PR chain, the actual support might come later + NoiseInformation { supported: true }, //Now we can enable that ) .with_gateway(load_gateway_details(self.gateway_config)?) .with_landing_page_assets(self.gateway_config.http.landing_page_assets_path.as_ref()); diff --git a/mixnode/src/node/http/mod.rs b/mixnode/src/node/http/mod.rs index 5628ffb1bd9..bc43a90c6be 100644 --- a/mixnode/src/node/http/mod.rs +++ b/mixnode/src/node/http/mod.rs @@ -94,7 +94,7 @@ impl<'a> HttpApiBuilder<'a> { self.sphinx_keypair.public_key(), self.identity_keypair, )?, - NoiseInformation { supported: false }, //this field comes with Noise support, but with PR chain, the actual support might come later + NoiseInformation { supported: true }, //Now we can enable that ) .with_mixnode(load_mixnode_details(self.mixnode_config)?) .with_landing_page_assets(self.mixnode_config.http.landing_page_assets_path.as_ref()); From 9596dd3d84b0e3e2fc9324758b8de51ac8969ec4 Mon Sep 17 00:00:00 2001 From: Simon Wicky Date: Tue, 6 Feb 2024 10:30:44 +0100 Subject: [PATCH 8/9] apply some of octlol's suggestion --- .../client-libs/mixnet-client/src/client.rs | 87 +++++++++---------- common/nymnoise/src/error.rs | 1 + common/nymnoise/src/lib.rs | 12 ++- .../receiver/connection_handler.rs | 11 +-- .../node/listener/connection_handler/mod.rs | 11 +-- 5 files changed, 57 insertions(+), 65 deletions(-) diff --git a/common/client-libs/mixnet-client/src/client.rs b/common/client-libs/mixnet-client/src/client.rs index b0f64a96ff6..7f6cbf107fc 100644 --- a/common/client-libs/mixnet-client/src/client.rs +++ b/common/client-libs/mixnet-client/src/client.rs @@ -110,55 +110,54 @@ impl Client { let connection_fut = TcpStream::connect(address); let conn = match tokio::time::timeout(connection_timeout, connection_fut).await { - Ok(stream_res) => match stream_res { - Ok(stream) => { - debug!("Managed to establish connection to {}", address); - // if we managed to connect, reset the reconnection count (whatever it might have been) - current_reconnection.store(0, Ordering::Release); - //Get the topology, because we need the keys for the handshake - let topology_ref = match topology_access.current_topology().await { - Some(topology) => topology, - None => { + Ok(stream_res) => { + match stream_res { + Ok(stream) => { + debug!("Managed to establish connection to {}", address); + // if we managed to connect, reset the reconnection count (whatever it might have been) + current_reconnection.store(0, Ordering::Release); + //Get the topology, because we need the keys for the handshake + let Some(topology) = topology_access.current_topology().await else { error!("Cannot perform Noise handshake to {address}, due to topology error"); return; - } - }; + }; - let epoch_id = match api_client.get_current_epoch_id().await { - Ok(id) => id, - Err(err) => { - error!("Cannot perform Noise handshake to {address}, due to epoch id error - {err}"); - return; - } - }; + let epoch_id = match api_client.get_current_epoch_id().await { + Ok(id) => id, + Err(err) => { + error!("Cannot perform Noise handshake to {address}, due to epoch id error - {err}"); + return; + } + }; - let noise_stream = match upgrade_noise_initiator_with_topology( - stream, - Default::default(), - &topology_ref, - epoch_id, - &local_identity.public_key().to_bytes(), - &local_identity.private_key().to_bytes(), - ) - .await - { - Ok(noise_stream) => noise_stream, - Err(err) => { - error!("Failed to perform Noise handshake with {address} - {err}"); - return; - } - }; - debug!("Noise initiator handshake completed for {:?}", address); - Framed::new(noise_stream, NymCodec) + let noise_stream = match upgrade_noise_initiator_with_topology( + stream, + Default::default(), + &topology, + epoch_id, + &local_identity.public_key().to_bytes(), + &local_identity.private_key().to_bytes(), + ) + .await + { + Ok(noise_stream) => noise_stream, + Err(err) => { + error!("Failed to perform Noise handshake with {address} - {err}"); + return; + } + }; + debug!("Noise initiator handshake completed for {:?}", address); + Framed::new(noise_stream, NymCodec) + } + Err(err) => { + debug!( + "failed to establish connection to {} (err: {})", + address, err + ); + return; + } } - Err(err) => { - debug!( - "failed to establish connection to {} (err: {})", - address, err - ); - return; - } - }, + } Err(_) => { debug!( "failed to connect to {} within {:?}", diff --git a/common/nymnoise/src/error.rs b/common/nymnoise/src/error.rs index 208c5b8ef5b..d2665eebf8a 100644 --- a/common/nymnoise/src/error.rs +++ b/common/nymnoise/src/error.rs @@ -13,6 +13,7 @@ pub enum NoiseError { #[error("encountered a Noise Protocol error - {0}")] ProtocolError(Error), + #[error("encountered an IO error - {0}")] IoError(#[from] io::Error), diff --git a/common/nymnoise/src/lib.rs b/common/nymnoise/src/lib.rs index 18d53ebc4b6..c3f26b319e4 100644 --- a/common/nymnoise/src/lib.rs +++ b/common/nymnoise/src/lib.rs @@ -54,13 +54,11 @@ pub async fn upgrade_noise_initiator_with_topology( local_private_key: &[u8], ) -> Result { //Get init material - let responder_addr = match conn.peer_addr() { - Ok(addr) => addr, - Err(err) => { - error!("Unable to extract peer address from connection - {err}"); - return Err(Error::Prereq(Prerequisite::RemotePublicKey).into()); - } - }; + let responder_addr = conn.peer_addr().map_err(|err| { + error!("Unable to extract peer address from connection - {err}"); + Error::Prereq(Prerequisite::RemotePublicKey) + })?; + let remote_pub_key = match topology.find_node_key_by_mix_host(responder_addr) { Ok(Some(key)) => encryption::PublicKey::from_base58_string(key)?.to_bytes(), Ok(None) => { diff --git a/gateway/src/node/mixnet_handling/receiver/connection_handler.rs b/gateway/src/node/mixnet_handling/receiver/connection_handler.rs index f14405f65f5..0df757835c8 100644 --- a/gateway/src/node/mixnet_handling/receiver/connection_handler.rs +++ b/gateway/src/node/mixnet_handling/receiver/connection_handler.rs @@ -223,12 +223,9 @@ impl ConnectionHandler { debug!("Starting connection handler for {:?}", remote); shutdown.mark_as_success(); - let topology_ref = match self.topology_access.current_topology().await { - Some(topology) => topology, - None => { - error!("Cannot perform Noise handshake to {remote}, due to topology error"); - return; - } + let Some(topology) = self.topology_access.current_topology().await else { + error!("Cannot perform Noise handshake to {remote}, due to topology error"); + return; }; let epoch_id = match self.api_client.get_current_epoch_id().await { @@ -242,7 +239,7 @@ impl ConnectionHandler { let noise_stream = match upgrade_noise_responder_with_topology( conn, Default::default(), - &topology_ref, + &topology, epoch_id, &self.local_identity.public_key().to_bytes(), &self.local_identity.private_key().to_bytes(), diff --git a/mixnode/src/node/listener/connection_handler/mod.rs b/mixnode/src/node/listener/connection_handler/mod.rs index b1118526bcf..b9545ceea81 100644 --- a/mixnode/src/node/listener/connection_handler/mod.rs +++ b/mixnode/src/node/listener/connection_handler/mod.rs @@ -105,12 +105,9 @@ impl ConnectionHandler { shutdown.mark_as_success(); - let topology_ref = match self.topology_access.current_topology().await { - Some(topology) => topology, - None => { - error!("Cannot perform Noise handshake to {remote}, due to topology error"); - return; - } + let Some(topology) = self.topology_access.current_topology().await else { + error!("Cannot perform Noise handshake to {remote}, due to topology error"); + return; }; let epoch_id = match self.api_client.get_current_epoch_id().await { @@ -124,7 +121,7 @@ impl ConnectionHandler { let noise_stream = match upgrade_noise_responder_with_topology( conn, Default::default(), - &topology_ref, + &topology, epoch_id, &self.local_identity.public_key().to_bytes(), &self.local_identity.private_key().to_bytes(), From 350a90d8483b2bdf19c03604d539718a8029a4f8 Mon Sep 17 00:00:00 2001 From: Simon Wicky Date: Tue, 6 Feb 2024 11:02:56 +0100 Subject: [PATCH 9/9] apply octlol's type suggestion --- .../client-libs/mixnet-client/src/client.rs | 4 +- common/nymnoise/src/lib.rs | 46 ++++++++++--------- .../receiver/connection_handler.rs | 4 +- .../node/listener/connection_handler/mod.rs | 4 +- 4 files changed, 31 insertions(+), 27 deletions(-) diff --git a/common/client-libs/mixnet-client/src/client.rs b/common/client-libs/mixnet-client/src/client.rs index 7f6cbf107fc..31dd88a0b10 100644 --- a/common/client-libs/mixnet-client/src/client.rs +++ b/common/client-libs/mixnet-client/src/client.rs @@ -135,8 +135,8 @@ impl Client { Default::default(), &topology, epoch_id, - &local_identity.public_key().to_bytes(), - &local_identity.private_key().to_bytes(), + local_identity.public_key(), + local_identity.private_key(), ) .await { diff --git a/common/nymnoise/src/lib.rs b/common/nymnoise/src/lib.rs index c3f26b319e4..ded97850e71 100644 --- a/common/nymnoise/src/lib.rs +++ b/common/nymnoise/src/lib.rs @@ -18,25 +18,27 @@ pub mod stream; pub async fn upgrade_noise_initiator( conn: TcpStream, pattern: NoisePattern, - local_public_key: Option<&[u8]>, - local_private_key: &[u8], - remote_pub_key: &[u8], + local_public_key: Option<&encryption::PublicKey>, + local_private_key: &encryption::PrivateKey, + remote_pub_key: &encryption::PublicKey, epoch: u32, ) -> Result { trace!("Perform Noise Handshake, initiator side"); //In case the local key cannot be known by the remote party, e.g. in a client-gateway connection let secret = [ - local_public_key.unwrap_or(&[]), - remote_pub_key, - &epoch.to_be_bytes(), + local_public_key + .map(|k| k.to_bytes().to_vec()) + .unwrap_or_default(), + remote_pub_key.to_bytes().to_vec(), + epoch.to_be_bytes().to_vec(), ] .concat(); let secret_hash = Sha256::digest(secret); let handshake = Builder::new(pattern.as_str().parse()?) - .local_private_key(local_private_key) - .remote_public_key(remote_pub_key) + .local_private_key(&local_private_key.to_bytes()) + .remote_public_key(&remote_pub_key.to_bytes()) .psk(pattern.psk_position(), &secret_hash) .build_initiator()?; @@ -50,8 +52,8 @@ pub async fn upgrade_noise_initiator_with_topology( pattern: NoisePattern, topology: &NymTopology, epoch: u32, - local_public_key: &[u8], - local_private_key: &[u8], + local_public_key: &encryption::PublicKey, + local_private_key: &encryption::PrivateKey, ) -> Result { //Get init material let responder_addr = conn.peer_addr().map_err(|err| { @@ -60,7 +62,7 @@ pub async fn upgrade_noise_initiator_with_topology( })?; let remote_pub_key = match topology.find_node_key_by_mix_host(responder_addr) { - Ok(Some(key)) => encryption::PublicKey::from_base58_string(key)?.to_bytes(), + Ok(Some(key)) => encryption::PublicKey::from_base58_string(key)?, Ok(None) => { warn!( "{:?} can't speak Noise yet, falling back to TCP", @@ -91,24 +93,26 @@ pub async fn upgrade_noise_initiator_with_topology( pub async fn upgrade_noise_responder( conn: TcpStream, pattern: NoisePattern, - local_public_key: &[u8], - local_private_key: &[u8], - remote_pub_key: Option<&[u8]>, + local_public_key: &encryption::PublicKey, + local_private_key: &encryption::PrivateKey, + remote_pub_key: Option<&encryption::PublicKey>, epoch: u32, ) -> Result { trace!("Perform Noise Handshake, responder side"); //If the remote_key cannot be kwnown, e.g. in a client-gateway connection let secret = [ - remote_pub_key.unwrap_or(&[]), - local_public_key, - &epoch.to_be_bytes(), + remote_pub_key + .map(|k| k.to_bytes().to_vec()) + .unwrap_or_default(), + local_public_key.to_bytes().to_vec(), + epoch.to_be_bytes().to_vec(), ] .concat(); let secret_hash = Sha256::digest(secret); let handshake = Builder::new(pattern.as_str().parse()?) - .local_private_key(local_private_key) + .local_private_key(&local_private_key.to_bytes()) .psk(pattern.psk_position(), &secret_hash) .build_responder()?; @@ -122,8 +126,8 @@ pub async fn upgrade_noise_responder_with_topology( pattern: NoisePattern, topology: &NymTopology, epoch: u32, - local_public_key: &[u8], - local_private_key: &[u8], + local_public_key: &encryption::PublicKey, + local_private_key: &encryption::PrivateKey, ) -> Result { //Get init material let initiator_addr = match conn.peer_addr() { @@ -136,7 +140,7 @@ pub async fn upgrade_noise_responder_with_topology( //SW : for private gateway, we could try to perform the handshake without that key? let remote_pub_key = match topology.find_node_key_by_mix_host(initiator_addr) { - Ok(Some(key)) => encryption::PublicKey::from_base58_string(key)?.to_bytes(), + Ok(Some(key)) => encryption::PublicKey::from_base58_string(key)?, Ok(None) => { warn!( "{:?} can't speak Noise yet, falling back to TCP", diff --git a/gateway/src/node/mixnet_handling/receiver/connection_handler.rs b/gateway/src/node/mixnet_handling/receiver/connection_handler.rs index 0df757835c8..b2c002a8a44 100644 --- a/gateway/src/node/mixnet_handling/receiver/connection_handler.rs +++ b/gateway/src/node/mixnet_handling/receiver/connection_handler.rs @@ -241,8 +241,8 @@ impl ConnectionHandler { Default::default(), &topology, epoch_id, - &self.local_identity.public_key().to_bytes(), - &self.local_identity.private_key().to_bytes(), + self.local_identity.public_key(), + self.local_identity.private_key(), ) .await { diff --git a/mixnode/src/node/listener/connection_handler/mod.rs b/mixnode/src/node/listener/connection_handler/mod.rs index b9545ceea81..796217ff58d 100644 --- a/mixnode/src/node/listener/connection_handler/mod.rs +++ b/mixnode/src/node/listener/connection_handler/mod.rs @@ -123,8 +123,8 @@ impl ConnectionHandler { Default::default(), &topology, epoch_id, - &self.local_identity.public_key().to_bytes(), - &self.local_identity.private_key().to_bytes(), + self.local_identity.public_key(), + self.local_identity.private_key(), ) .await {