Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 9 additions & 9 deletions broadcast/src/buffered/engine.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ use futures::{
channel::{mpsc, oneshot},
StreamExt,
};
use std::collections::{HashMap, VecDeque};
use std::collections::{BTreeMap, VecDeque};
use tracing::{debug, error, trace, warn};

/// A responder waiting for a message.
Expand Down Expand Up @@ -76,7 +76,7 @@ pub struct Engine<E: Clock + Spawner + Metrics, P: PublicKey, M: Committable + D

/// Pending requests from the application.
#[allow(clippy::type_complexity)]
waiters: HashMap<M::Commitment, Vec<Waiter<P, M::Digest, M>>>,
waiters: BTreeMap<M::Commitment, Vec<Waiter<P, M::Digest, M>>>,

////////////////////////////////////////
// Cache
Expand All @@ -85,21 +85,21 @@ pub struct Engine<E: Clock + Spawner + Metrics, P: PublicKey, M: Committable + D
///
/// We store messages outside of the deques to minimize memory usage
/// when receiving duplicate messages.
items: HashMap<M::Commitment, HashMap<M::Digest, M>>,
items: BTreeMap<M::Commitment, BTreeMap<M::Digest, M>>,

/// A LRU cache of the latest received identities and digests from each peer.
///
/// This is used to limit the number of digests stored per peer.
/// At most `deque_size` digests are stored per peer. This value is expected to be small, so
/// membership checks are done in linear time.
#[allow(clippy::type_complexity)]
deques: HashMap<P, VecDeque<Pair<M::Commitment, M::Digest>>>,
deques: BTreeMap<P, VecDeque<Pair<M::Commitment, M::Digest>>>,

/// The number of times each digest (globally unique) exists in one of the deques.
///
/// Multiple peers can send the same message and we only want to store
/// the message once.
counts: HashMap<M::Digest, usize>,
counts: BTreeMap<M::Digest, usize>,

////////////////////////////////////////
// Metrics
Expand All @@ -125,10 +125,10 @@ impl<E: Clock + Spawner + Metrics, P: PublicKey, M: Committable + Digestible + C
deque_size: cfg.deque_size,
codec_config: cfg.codec_config,
mailbox_receiver,
waiters: HashMap::new(),
deques: HashMap::new(),
items: HashMap::new(),
counts: HashMap::new(),
waiters: BTreeMap::new(),
deques: BTreeMap::new(),
items: BTreeMap::new(),
counts: BTreeMap::new(),
metrics,
};

Expand Down
51 changes: 50 additions & 1 deletion broadcast/src/buffered/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ mod tests {
use commonware_codec::RangeCfg;
use commonware_cryptography::{
ed25519::{PrivateKey, PublicKey},
Committable, Digestible, PrivateKeyExt as _, Signer as _,
Committable, Digestible, Hasher, PrivateKeyExt as _, Sha256, Signer as _,
};
use commonware_macros::{select, test_traced};
use commonware_p2p::{
Expand Down Expand Up @@ -602,6 +602,55 @@ mod tests {
});
}

#[test_traced]
fn test_get_all_for_commitment_deterministic_order() {
let run = |seed: u64| {
let config = deterministic::Config::new()
.with_seed(seed)
.with_timeout(Some(Duration::from_secs(5)));
let runner = deterministic::Runner::new(config);
runner.start(|context| async move {
let (peers, mut registrations, _oracle) =
initialize_simulation(context.clone(), 1, 1.0).await;
let mailboxes = spawn_peer_engines(context.clone(), &mut registrations);

let sender1 = peers[0].clone();
let mut mb1 = mailboxes.get(&sender1).unwrap().clone();

// Two messages share commitment but have distinct digests.
let m1 = TestMessage::new(b"id", b"content-1");
let m2 = TestMessage::new(b"id", b"content-2");
let m3 = TestMessage::new(b"id", b"content-3");
mb1.broadcast(Recipients::All, m1.clone())
.await
.await
.unwrap();
mb1.broadcast(Recipients::All, m2.clone())
.await
.await
.unwrap();
mb1.broadcast(Recipients::All, m3.clone())
.await
.await
.unwrap();

let mut hasher = Sha256::default();
let values = mb1.get(None, m1.commitment(), None).await;
for value in values {
hasher.update(&value.content);
}
hasher.finalize()
})
};

for seed in 0..10 {
let h1 = run(seed);
let h2 = run(seed);

assert_eq!(h1, h2, "Messages returned in different order for {seed}");
}
}

#[test_traced]
fn test_ref_count_across_peers() {
let runner = deterministic::Runner::timed(Duration::from_secs(10));
Expand Down
Loading