Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 14 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,7 @@ undocumented_unsafe_blocks = "deny"
suspicious_op_assign_impl = "allow"

[workspace.dependencies]
ahash = { version = "0.8.12", default-features = false }
anyhow = { version = "1.0.99", default-features = false }
arbitrary = "1.4.1"
async-lock = "3.4.0"
Expand Down
28 changes: 14 additions & 14 deletions p2p/src/simulated/network.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,8 @@ use commonware_codec::{DecodeExt, FixedSize};
use commonware_cryptography::PublicKey;
use commonware_macros::{select, select_loop};
use commonware_runtime::{
spawn_cell, Clock, ContextCell, Handle, Listener as _, Metrics, Network as RNetwork, Quota,
Spawner,
spawn_cell, Clock, ContextCell, Handle, HashMap, HashSet, Listener as _, Metrics,
Network as RNetwork, Quota, Spawner,
};
use commonware_stream::utils::codec::{recv_frame, send_frame};
use commonware_utils::{channels::ring, ordered::Set, NZUsize, TryCollect};
Expand All @@ -29,7 +29,7 @@ use prometheus_client::metrics::{counter::Counter, family::Family};
use rand::Rng;
use rand_distr::{Distribution, Normal};
use std::{
collections::{BTreeMap, HashMap, HashSet},
collections::BTreeMap,
fmt::Debug,
net::{IpAddr, Ipv4Addr, SocketAddr},
time::{Duration, SystemTime},
Expand Down Expand Up @@ -133,13 +133,13 @@ pub struct Network<E: RNetwork + Spawner + Rng + Clock + Metrics, P: PublicKey>
links: HashMap<(P, P), Link>,

// A map from a public key to a peer
peers: BTreeMap<P, Peer<P>>,
peers: HashMap<P, Peer<P>>,

// Peer sets indexed by their ID
peer_sets: BTreeMap<u64, Set<P>>,

// Reference count for each peer (number of peer sets they belong to)
peer_refs: BTreeMap<P, usize>,
peer_refs: HashMap<P, usize>,

// Maximum number of peer sets to track
tracked_peer_sets: Option<usize>,
Expand Down Expand Up @@ -193,11 +193,11 @@ impl<E: RNetwork + Spawner + Rng + Clock + Metrics, P: PublicKey> Network<E, P>
oracle_sender: oracle_sender.clone(),
sender,
receiver,
links: HashMap::new(),
peers: BTreeMap::new(),
peer_sets: BTreeMap::new(),
peer_refs: BTreeMap::new(),
blocks: HashSet::new(),
links: HashMap::default(),
peers: HashMap::default(),
peer_sets: BTreeMap::default(),
peer_refs: HashMap::default(),
blocks: HashSet::default(),
transmitter: transmitter::State::new(),
subscribers: Vec::new(),
peer_subscribers: Vec::new(),
Expand Down Expand Up @@ -363,7 +363,7 @@ impl<E: RNetwork + Spawner + Rng + Clock + Metrics, P: PublicKey> Network<E, P>
.keys()
.cloned()
.try_collect()
.expect("BTreeMap keys are unique"),
.expect("HashMap keys are unique"),
));
} else {
// Return the peer set at the given index
Expand Down Expand Up @@ -525,13 +525,13 @@ impl<E: RNetwork + Spawner + Rng + Clock + Metrics, P: PublicKey> Network<E, P>
.keys()
.cloned()
.try_collect()
.expect("BTreeMap keys are unique")
.expect("HashMap keys are unique")
} else {
self.peer_refs
.keys()
.cloned()
.try_collect()
.expect("BTreeMap keys are unique")
.expect("HashMap keys are unique")
}
}
}
Expand Down Expand Up @@ -1096,7 +1096,7 @@ impl<P: PublicKey> Peer<P> {
// Spawn router
context.with_label("router").spawn(|context| async move {
// Map of channels to mailboxes (senders to particular channels)
let mut mailboxes = HashMap::new();
let mut mailboxes = HashMap::default();

// Continually listen for control messages and outbound messages
select_loop! {
Expand Down
2 changes: 2 additions & 0 deletions runtime/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ documentation = "https://docs.rs/commonware-runtime"
workspace = true

[dependencies]
ahash.workspace = true
async-lock.workspace = true
bytes.workspace = true
cfg-if.workspace = true
Expand All @@ -22,6 +23,7 @@ commonware-macros.workspace = true
commonware-utils = { workspace = true, features = ["std"] }
futures.workspace = true
governor.workspace = true
hashbrown.workspace = true
io-uring = { workspace = true, optional = true }
libc.workspace = true
opentelemetry.workspace = true
Expand Down
75 changes: 75 additions & 0 deletions runtime/src/deterministic.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@
//! ```

use crate::{
hash,
network::{
audited::Network as AuditedNetwork, deterministic::Network as DeterministicNetwork,
metered::Network as MeteredNetwork,
Expand Down Expand Up @@ -365,6 +366,7 @@ pub struct Checkpoint {
storage: Arc<Storage>,
dns: Mutex<HashMap<String, Vec<IpAddr>>>,
catch_panics: bool,
hash_seed: u64,
}

impl Checkpoint {
Expand Down Expand Up @@ -393,6 +395,9 @@ impl From<Config> for Runner {

impl From<Checkpoint> for Runner {
fn from(checkpoint: Checkpoint) -> Self {
// Restore TLS hash seed for deterministic HashMap/HashSet iteration order
hash::set_seed(checkpoint.hash_seed);

Self {
state: State::Checkpoint(checkpoint),
}
Expand All @@ -404,6 +409,10 @@ impl Runner {
pub fn new(cfg: Config) -> Self {
// Ensure config is valid
cfg.assert();

// Set TLS hash seed for deterministic HashMap/HashSet iteration order
hash::set_seed(cfg.seed);

Self {
state: State::Config(cfg),
}
Expand Down Expand Up @@ -606,6 +615,7 @@ impl Runner {
storage,
dns: executor.dns,
catch_panics: executor.panicker.catch(),
hash_seed: hash::get_seed().expect("hash seed not set"),
};

(output, checkpoint)
Expand Down Expand Up @@ -1811,4 +1821,69 @@ mod tests {
assert!(iterations > 500);
});
}

#[test]
fn test_hashmap_deterministic_iteration() {
use crate::HashMap;

// Helper to create a HashMap and collect iteration order
fn collect_iteration_order(seed: u64) -> Vec<i32> {
let executor = deterministic::Runner::seeded(seed);
executor.start(|_context| async move {
let mut map: HashMap<i32, &str> = HashMap::default();
for i in 0..100 {
map.insert(i, "value");
}
map.keys().copied().collect()
})
}

// Same seed should produce same iteration order
let order1 = collect_iteration_order(42);
let order2 = collect_iteration_order(42);
assert_eq!(
order1, order2,
"same seed should produce same iteration order"
);

// Different seeds should produce different iteration order
let order3 = collect_iteration_order(12345);
assert_ne!(
order1, order3,
"different seeds should produce different iteration order"
);
}

#[test]
fn test_hashmap_deterministic_after_checkpoint() {
use crate::HashMap;

// Create a HashMap, checkpoint, and verify iteration order is preserved
let seed = 42u64;
let executor = deterministic::Runner::seeded(seed);

let (order_before, checkpoint) = executor.start_and_recover(|_context| async move {
let mut map: HashMap<i32, &str> = HashMap::default();
for i in 0..50 {
map.insert(i, "value");
}
map.keys().copied().collect::<Vec<_>>()
});

// Recover from checkpoint and create another HashMap
let executor = deterministic::Runner::from(checkpoint);
let order_after = executor.start(|_context| async move {
let mut map: HashMap<i32, &str> = HashMap::default();
for i in 0..50 {
map.insert(i, "value");
}
map.keys().copied().collect::<Vec<_>>()
});

// Both should have the same iteration order (same seed preserved through checkpoint)
assert_eq!(
order_before, order_after,
"iteration order should be preserved through checkpoint"
);
}
}
Loading
Loading