Skip to content
3 changes: 3 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

10 changes: 10 additions & 0 deletions common/client-core/src/client/topology_control/accessor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,16 @@ impl<'a> TopologyReadPermit<'a> {

Ok(topology)
}

pub fn try_get_raw_topology_ref(&'a self) -> Result<&'a NymTopology, NymTopologyError> {
// 1. Have we managed to get anything from the refresher, i.e. have the nym-api queries gone through?
let topology = self
.permit
.as_ref()
.ok_or(NymTopologyError::EmptyNetworkTopology)?;

Ok(topology)
}
}

impl<'a> From<RwLockReadGuard<'a, Option<NymTopology>>> for TopologyReadPermit<'a> {
Expand Down
4 changes: 2 additions & 2 deletions common/client-core/src/client/topology_control/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,9 +16,9 @@ use tokio::time::sleep;
#[cfg(target_arch = "wasm32")]
use wasmtimer::tokio::sleep;

mod accessor;
pub mod accessor;
pub mod geo_aware_provider;
pub(crate) mod nym_api_provider;
pub mod nym_api_provider;

// TODO: move it to config later
const MAX_FAILURE_COUNT: usize = 10;
Expand Down
16 changes: 13 additions & 3 deletions common/client-core/src/client/topology_control/nym_api_provider.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ use rand::prelude::SliceRandom;
use rand::thread_rng;
use url::Url;

pub(crate) struct NymApiTopologyProvider {
pub struct NymApiTopologyProvider {
validator_client: nym_validator_client::client::NymApiClient,
nym_api_urls: Vec<Url>,

Expand All @@ -18,7 +18,7 @@ pub(crate) struct NymApiTopologyProvider {
}

impl NymApiTopologyProvider {
pub(crate) fn new(mut nym_api_urls: Vec<Url>, client_version: String) -> Self {
pub fn new(mut nym_api_urls: Vec<Url>, client_version: String) -> Self {
nym_api_urls.shuffle(&mut thread_rng());

NymApiTopologyProvider {
Expand Down Expand Up @@ -77,13 +77,23 @@ impl NymApiTopologyProvider {
Ok(gateways) => gateways,
};

let nodes_described = match self.validator_client.get_cached_described_nodes().await {
Err(err) => {
error!("failed to get described nodes - {err}");
return None;
}
Ok(epoch) => epoch,
};

let topology = nym_topology_from_detailed(mixnodes, gateways)
.with_described_nodes(nodes_described.clone())
.filter_system_version(&self.client_version);

if let Err(err) = self.check_layer_distribution(&topology) {
warn!("The current filtered active topology has extremely skewed layer distribution. It cannot be used: {err}");
self.use_next_nym_api();
None
let empty_topology = NymTopology::empty().with_described_nodes(nodes_described);
Some(empty_topology)
} else {
Some(topology)
}
Expand Down
6 changes: 6 additions & 0 deletions common/client-libs/validator-client/src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -295,6 +295,12 @@ impl NymApiClient {
Ok(self.nym_api.get_nym_nodes_described().await?)
}

pub async fn get_current_epoch_id(
&self,
) -> Result<nym_mixnet_contract_common::EpochId, ValidatorClientError> {
Ok(self.nym_api.get_current_epoch().await?.current_epoch_id())
}

pub async fn get_gateway_core_status_count(
&self,
identity: IdentityKeyRef<'_>,
Expand Down
10 changes: 9 additions & 1 deletion common/client-libs/validator-client/src/nym_api/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ pub use nym_api_requests::{
};
pub use nym_coconut_dkg_common::types::EpochId;
use nym_mixnet_contract_common::mixnode::MixNodeDetails;
use nym_mixnet_contract_common::{GatewayBond, IdentityKeyRef, MixId};
use nym_mixnet_contract_common::{GatewayBond, IdentityKeyRef, Interval, MixId};
use nym_name_service_common::response::NamesListResponse;
use nym_service_provider_directory_common::response::ServicesListResponse;

Expand Down Expand Up @@ -150,6 +150,14 @@ pub trait NymApiClientExt: ApiClient {
.await
}

async fn get_current_epoch(&self) -> Result<Interval, NymAPIError> {
self.get_json(
&[routes::API_VERSION, routes::EPOCH, routes::CURRENT],
NO_PARAMS,
)
.await
}

async fn get_gateway_report(
&self,
identity: IdentityKeyRef<'_>,
Expand Down
3 changes: 3 additions & 0 deletions common/client-libs/validator-client/src/nym_api/routes.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,9 @@ pub const GATEWAYS: &str = "gateways";
pub const NYM_NODES: &str = "nym-nodes";
pub const DESCRIBED: &str = "described";

pub const EPOCH: &str = "epoch";
pub const CURRENT: &str = "current";

pub const DETAILED: &str = "detailed";
pub const DETAILED_UNFILTERED: &str = "detailed-unfiltered";
pub const ACTIVE: &str = "active";
Expand Down
25 changes: 22 additions & 3 deletions common/topology/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ use std::str::FromStr;

#[cfg(feature = "serializable")]
use ::serde::{Deserialize, Deserializer, Serialize, Serializer};
use nym_api_requests::models::DescribedGateway;
use nym_api_requests::models::{DescribedGateway, DescribedNymNode};

pub mod error;
pub mod filter;
Expand Down Expand Up @@ -115,11 +115,29 @@ pub type MixLayer = u8;
pub struct NymTopology {
mixes: BTreeMap<MixLayer, Vec<mix::Node>>,
gateways: Vec<gateway::Node>,
described_nodes: Vec<DescribedNymNode>,
}

impl NymTopology {
pub fn new(mixes: BTreeMap<MixLayer, Vec<mix::Node>>, gateways: Vec<gateway::Node>) -> Self {
NymTopology { mixes, gateways }
NymTopology {
mixes: mixes.clone(),
gateways: gateways.clone(),
described_nodes: Vec::new(),
}
}

pub fn empty() -> Self {
NymTopology {
mixes: BTreeMap::new(),
gateways: Vec::new(),
described_nodes: Vec::new(),
}
}

pub fn with_described_nodes(mut self, described_nodes: Vec<DescribedNymNode>) -> Self {
self.described_nodes = described_nodes;
self
}

pub fn new_unordered(unordered_mixes: Vec<mix::Node>, gateways: Vec<gateway::Node>) -> Self {
Expand All @@ -130,7 +148,7 @@ impl NymTopology {
layer_entry.push(node)
}

NymTopology { mixes, gateways }
NymTopology::new(mixes, gateways)
}

#[cfg(feature = "serializable")]
Expand Down Expand Up @@ -379,6 +397,7 @@ impl NymTopology {
NymTopology {
mixes: self.mixes.filter_by_version(expected_mix_version),
gateways: self.gateways.clone(),
described_nodes: self.described_nodes.clone(),
}
}
}
Expand Down
2 changes: 2 additions & 0 deletions gateway/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,8 @@ nym-task = { path = "../common/task" }
nym-types = { path = "../common/types" }
nym-validator-client = { path = "../common/client-libs/validator-client" }
nym-ip-packet-router = { path = "../service-providers/ip-packet-router" }
nym-client-core = { path = "../common/client-core"}
nym-topology = { path = "../common/topology" }

nym-wireguard = { path = "../common/wireguard", optional = true }
defguard_wireguard_rs = { git = "https://github.com/neacsu/wireguard-rs.git", rev = "c2cd0c1119f699f4bc43f5e6ffd6fc242caa42ed", optional = true }
Expand Down
37 changes: 37 additions & 0 deletions gateway/src/config/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,9 @@ const DEFAULT_PACKET_FORWARDING_MAXIMUM_BACKOFF: Duration = Duration::from_milli
const DEFAULT_INITIAL_CONNECTION_TIMEOUT: Duration = Duration::from_millis(1_500);
const DEFAULT_MAXIMUM_CONNECTION_BUFFER_SIZE: usize = 2000;

const DEFAULT_TOPOLOGY_REFRESH_RATE: Duration = Duration::from_secs(5 * 60); // every 5min
const DEFAULT_TOPOLOGY_RESOLUTION_TIMEOUT: Duration = Duration::from_millis(5_000);

const DEFAULT_STORED_MESSAGE_FILENAME_LENGTH: u16 = 16;
const DEFAULT_MESSAGE_RETRIEVAL_LIMIT: i64 = 100;

Expand Down Expand Up @@ -109,6 +112,9 @@ pub struct Config {

#[serde(default)]
pub debug: Debug,

#[serde(default)]
pub topology: Topology,
}

impl NymConfigTemplate for Config {
Expand All @@ -135,6 +141,7 @@ impl Config {
ip_packet_router: Default::default(),
logging: Default::default(),
debug: Default::default(),
topology: Default::default(),
}
}

Expand Down Expand Up @@ -442,3 +449,33 @@ impl Default for Debug {
}
}
}

#[derive(Debug, Clone, Copy, Deserialize, PartialEq, Serialize)]
#[serde(default, deny_unknown_fields)]
pub struct Topology {
/// The uniform delay every which clients are querying the directory server
/// to try to obtain a compatible network topology to send sphinx packets through.
#[serde(with = "humantime_serde")]
pub topology_refresh_rate: Duration,

/// During topology refresh, test packets are sent through every single possible network
/// path. This timeout determines waiting period until it is decided that the packet
/// did not reach its destination.
#[serde(with = "humantime_serde")]
pub topology_resolution_timeout: Duration,

/// Specifies whether the client should not refresh the network topology after obtaining
/// the first valid instance.
/// Supersedes `topology_refresh_rate_ms`.
pub disable_refreshing: bool,
}

impl Default for Topology {
fn default() -> Self {
Topology {
topology_refresh_rate: DEFAULT_TOPOLOGY_REFRESH_RATE,
topology_resolution_timeout: DEFAULT_TOPOLOGY_RESOLUTION_TIMEOUT,
disable_refreshing: false,
}
}
}
3 changes: 3 additions & 0 deletions gateway/src/config/old_config_v1_1_31.rs
Original file line number Diff line number Diff line change
Expand Up @@ -165,6 +165,9 @@ impl From<ConfigV1_1_31> for Config {
message_retrieval_limit: value.debug.message_retrieval_limit,
use_legacy_framed_packet_version: value.debug.use_legacy_framed_packet_version,
},
// \/ ADDED
topology: Default::default(),
// /\ ADDED
}
}
}
Expand Down
61 changes: 60 additions & 1 deletion gateway/src/node/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ use crate::commands::helpers::{
override_ip_packet_router_config, override_network_requester_config,
OverrideIpPacketRouterConfig, OverrideNetworkRequesterConfig,
};
use crate::config::Config;
use crate::config::{Config, Topology};
use crate::error::GatewayError;
use crate::http::HttpApiBuilder;
use crate::node::client_handling::active_clients::ActiveClientsStore;
Expand All @@ -24,20 +24,26 @@ use anyhow::bail;
use dashmap::DashMap;
use futures::channel::{mpsc, oneshot};
use log::*;
use nym_client_core::client::topology_control::accessor::TopologyAccessor;
use nym_client_core::client::topology_control::nym_api_provider::NymApiTopologyProvider;
use nym_client_core::client::topology_control::TopologyRefresher;
use nym_client_core::client::topology_control::TopologyRefresherConfig;
use nym_crypto::asymmetric::{encryption, identity};
use nym_mixnet_client::forwarder::{MixForwardingSender, PacketForwarder};
use nym_network_defaults::NymNetworkDetails;
use nym_network_requester::{LocalGateway, NRServiceProviderBuilder, RequestFilter};
use nym_node::wireguard::types::GatewayClientRegistry;
use nym_statistics_common::collector::StatisticsSender;
use nym_task::{TaskClient, TaskManager};
use nym_topology::provider_trait::TopologyProvider;
use nym_validator_client::{nyxd, DirectSigningHttpRpcNyxdClient};
use rand::seq::SliceRandom;
use rand::thread_rng;
use std::error::Error;
use std::net::SocketAddr;
use std::path::PathBuf;
use std::sync::Arc;
use url::Url;

pub(crate) mod client_handling;
pub(crate) mod helpers;
Expand Down Expand Up @@ -425,6 +431,47 @@ impl<St> Gateway<St> {
.map_err(Into::into)
}

fn setup_topology_provider(nym_api_urls: Vec<Url>) -> Box<dyn TopologyProvider + Send + Sync> {
// if no custom provider was ... provided ..., create one using nym-api
Box::new(NymApiTopologyProvider::new(
nym_api_urls,
env!("CARGO_PKG_VERSION").to_string(),
))
}

// future responsible for periodically polling directory server and updating
// the current global view of topology
async fn start_topology_refresher(
topology_provider: Box<dyn TopologyProvider + Send + Sync>,
topology_config: Topology,
topology_accessor: TopologyAccessor,
mut shutdown: TaskClient,
) {
let topology_refresher_config =
TopologyRefresherConfig::new(topology_config.topology_refresh_rate);

let mut topology_refresher = TopologyRefresher::new(
topology_refresher_config,
topology_accessor,
topology_provider,
);
// before returning, block entire runtime to refresh the current network view so that any
// components depending on topology would see a non-empty view
info!("Obtaining initial network topology");
topology_refresher.try_refresh().await;

if topology_config.disable_refreshing {
// if we're not spawning the refresher, don't cause shutdown immediately
info!("The topology refesher is not going to be started");
shutdown.mark_as_success();
} else {
// don't spawn the refresher if we don't want to be refreshing the topology.
// only use the initial values obtained
info!("Starting topology refresher...");
topology_refresher.start_with_shutdown(shutdown);
}
}

async fn check_if_bonded(&self) -> Result<bool, GatewayError> {
// TODO: if anything, this should be getting data directly from the contract
// as opposed to the validator API
Expand Down Expand Up @@ -459,6 +506,18 @@ impl<St> Gateway<St> {
CoconutVerifier::new(nyxd_client).await
};

let topology_provider = Self::setup_topology_provider(self.config.get_nym_api_endpoints());

let shared_topology_access = TopologyAccessor::new();

Self::start_topology_refresher(
topology_provider,
self.config.topology,
shared_topology_access.clone(),
shutdown.subscribe(),
)
.await;

let mix_forwarding_channel =
self.start_packet_forwarder(shutdown.subscribe().named("PacketForwarder"));

Expand Down
1 change: 1 addition & 0 deletions mixnode/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,7 @@ nym-pemstore = { path = "../common/pemstore", version = "0.3.0" }
nym-task = { path = "../common/task" }
nym-types = { path = "../common/types" }
nym-topology = { path = "../common/topology" }
nym-client-core = { path = "../common/client-core/" }
nym-validator-client = { path = "../common/client-libs/validator-client" }
nym-bin-common = { path = "../common/bin-common", features = ["output_format"] }
cpu-cycles = { path = "../cpu-cycles", optional = true }
Expand Down
Loading