From c8745310a8387a489d6a8701452bf7a9e9b277cb Mon Sep 17 00:00:00 2001 From: oskin1 Date: Fri, 9 Jun 2023 11:52:08 +0300 Subject: [PATCH 01/11] Partial contribution verification. --- spectrum-crypto/src/lib.rs | 17 ++++- .../src/protocol_handler/handel.rs | 64 ++++++++++++------- .../sigma_aggregation/types.rs | 60 ++++++++++++++++- 3 files changed, 115 insertions(+), 26 deletions(-) diff --git a/spectrum-crypto/src/lib.rs b/spectrum-crypto/src/lib.rs index bb699af3..45327c3b 100644 --- a/spectrum-crypto/src/lib.rs +++ b/spectrum-crypto/src/lib.rs @@ -2,7 +2,22 @@ pub mod digest; mod hash; pub mod pubkey; +/// Result of partial verification. +#[derive(Debug)] +pub enum PVResult { + Invalid, + Valid { + contribution: T, + /// `true` if contribution is only partially valid. + partially: bool, + }, +} + /// Some statement which can be verified against public data `P`. -pub trait VerifiableAgainst

{ +pub trait VerifiableAgainst

: Sized { + /// Verifies the statement. fn verify(&self, public_data: &P) -> bool; + /// Verifies the statement and returns `Some(valid_part)` in case partial verification succeded. + /// Returns `None` otherwise. + fn verify_part(self, public_data: &P) -> PVResult; } diff --git a/spectrum-network/src/protocol_handler/handel.rs b/spectrum-network/src/protocol_handler/handel.rs index 4e41a61c..16b26fbd 100644 --- a/spectrum-network/src/protocol_handler/handel.rs +++ b/spectrum-network/src/protocol_handler/handel.rs @@ -10,10 +10,10 @@ use async_std::task::sleep; use either::{Either, Left, Right}; use futures::FutureExt; use libp2p::PeerId; +use log::trace; use algebra_core::CommutativePartialSemigroup; -use log::trace; -use spectrum_crypto::VerifiableAgainst; +use spectrum_crypto::{PVResult, VerifiableAgainst}; use crate::protocol_handler::handel::message::HandelMessage; use crate::protocol_handler::handel::partitioning::{PeerIx, PeerOrd, PeerPartitions}; @@ -263,28 +263,41 @@ where } } // Verify aggregate contributions - for sc in scored_contributions.into_iter() { - if sc.contribution.verify(&self.public_data) { - let Verified(best_contrib) = &lvl.best_contribution; - if sc.score > best_contrib.score { - trace!( - "{:?} set NEW best contribution score: {}", - self.own_peer_ix, - sc.score - ); - lvl.best_contribution = Verified(sc.into()); + for ScoredContributionTraced { + score, + sender_id, + contribution, + } in scored_contributions.into_iter() + { + match contribution.verify_part(&self.public_data) { + PVResult::Invalid => { + // Ban peer, shrink scoring window. + trace!("[Handel] run_aggr: {:?} BANNED", sender_id); + self.byzantine_nodes.insert(sender_id); + let shrinked_window = self + .scoring_window + .saturating_div(self.conf.window_shrinking_factor); + self.scoring_window = max(shrinked_window, 1); + } + PVResult::Valid { + contribution, + partially, + } => { + // Recalculate score if needed. + let updated_score = if partially { contribution.weight() } else { score }; + let Verified(best_contrib) = &lvl.best_contribution; + if updated_score > best_contrib.score { + trace!( + "{:?} set NEW best contribution score: {}", + self.own_peer_ix, + updated_score + ); + lvl.best_contribution = Verified(ScoredContribution { score, contribution }); + } + self.scoring_window = self + .scoring_window + .saturating_mul(self.conf.window_shrinking_factor); } - self.scoring_window = self - .scoring_window - .saturating_mul(self.conf.window_shrinking_factor); - } else { - // Ban peer, shrink scoring window. - trace!("[Handel] run_aggr: {:?} BANNED", sc.sender_id); - self.byzantine_nodes.insert(sc.sender_id); - let shrinked_window = self - .scoring_window - .saturating_div(self.conf.window_shrinking_factor); - self.scoring_window = max(shrinked_window, 1); } } let Verified(best_contrib) = &lvl.best_contribution; @@ -730,7 +743,7 @@ mod tests { use libp2p::{Multiaddr, PeerId}; use algebra_core::CommutativePartialSemigroup; - use spectrum_crypto::VerifiableAgainst; + use spectrum_crypto::{PVResult, VerifiableAgainst}; use crate::protocol_handler::handel::partitioning::tests::FakePartitions; use crate::protocol_handler::handel::partitioning::{ @@ -758,6 +771,9 @@ mod tests { fn verify(&self, proposition: &()) -> bool { true } + fn verify_part(self, public_data: &()) -> PVResult { + PVResult::Valid(self) + } } const CONF: HandelConfig = HandelConfig { diff --git a/spectrum-network/src/protocol_handler/sigma_aggregation/types.rs b/spectrum-network/src/protocol_handler/sigma_aggregation/types.rs index e3afd4da..ba9140fe 100644 --- a/spectrum-network/src/protocol_handler/sigma_aggregation/types.rs +++ b/spectrum-network/src/protocol_handler/sigma_aggregation/types.rs @@ -12,7 +12,7 @@ use serde::{Deserialize, Serialize}; use algebra_core::CommutativePartialSemigroup; use spectrum_crypto::digest::{blake2b256_hash, Blake2bDigest256}; use spectrum_crypto::pubkey::PublicKey; -use spectrum_crypto::VerifiableAgainst; +use spectrum_crypto::{PVResult, VerifiableAgainst}; use crate::protocol_handler::handel::partitioning::PeerIx; use crate::protocol_handler::handel::Weighted; @@ -139,6 +139,12 @@ impl VerifiableAgainst<()> for PreCommitments { fn verify(&self, _: &()) -> bool { true } + fn verify_part(self, _: &()) -> PVResult { + PVResult::Valid { + contribution: self, + partially: false, + } + } } pub struct CommitmentsVerifInput { @@ -183,6 +189,32 @@ impl VerifiableAgainst for CommitmentsWithProofs { } }) } + fn verify_part(self, public_data: &CommitmentsVerifInput) -> PVResult { + let contrib_len = self.0.len(); + let mut aggr = HashMap::new(); + let mut missing_parts = 0; + for (i, (commitment, sig)) in self.0 { + if let Some(pre_commitment) = public_data.pre_commitments.0.get(&i) { + let vk = VerifyingKey::from(commitment.clone()); + let verified = *pre_commitment == blake2b256_hash(&*commitment.as_bytes()) + && vk.verify(&public_data.message_digest_bytes, &sig.0).is_ok(); + if !verified { + return PVResult::Invalid; + } + aggr.insert(i, (commitment, sig)); + } else { + missing_parts += 1; + } + if missing_parts * 2 > contrib_len { + // More than 50% of aggregate is invalid. + return PVResult::Invalid; + } + } + PVResult::Valid { + contribution: Contributions(aggr), + partially: missing_parts > 0, + } + } } pub type Responses = Contributions; @@ -238,6 +270,32 @@ impl VerifiableAgainst for Responses { .unwrap_or(false) }) } + fn verify_part(self, public_data: &ResponsesVerifInput) -> PVResult { + let c = &public_data.challenge; + let contrib_len = self.0.len(); + let mut aggr = HashMap::new(); + let mut missing_parts = 0; + for (k, zi) in self.0 { + if let Some(input) = public_data.inputs.get(&k) { + let ai = &input.individual_input.into(); + let verified = verify_response(&zi, ai, c, input.commitment.clone(), input.pk.clone()); + if !verified { + return PVResult::Invalid; + } + aggr.insert(k, zi); + } else { + missing_parts += 1; + } + if missing_parts * 2 > contrib_len { + // More than 50% of aggregate is invalid. + return PVResult::Invalid; + } + } + PVResult::Valid { + contribution: Contributions(aggr), + partially: missing_parts > 0, + } + } } #[cfg(test)] From 770a37d0cd0685a4d06d36b84ee5364607674a28 Mon Sep 17 00:00:00 2001 From: oskin1 Date: Fri, 9 Jun 2023 11:54:34 +0300 Subject: [PATCH 02/11] Cleanup. --- spectrum-network/src/protocol_handler.rs | 2 +- spectrum-network/src/protocol_handler/sigma_aggregation.rs | 4 +--- .../src/protocol_handler/sigma_aggregation/crypto.rs | 1 - .../src/protocol_handler/sigma_aggregation/message.rs | 1 - 4 files changed, 2 insertions(+), 6 deletions(-) diff --git a/spectrum-network/src/protocol_handler.rs b/spectrum-network/src/protocol_handler.rs index f0c30389..696753a2 100644 --- a/spectrum-network/src/protocol_handler.rs +++ b/spectrum-network/src/protocol_handler.rs @@ -1,10 +1,10 @@ -use ::void::Void; use std::collections::{BTreeMap, HashMap}; use std::fmt::Debug; use std::marker::PhantomData; use std::pin::Pin; use std::task::{Context, Poll}; +use ::void::Void; use either::Either; use futures::channel::mpsc; use futures::channel::mpsc::Receiver; diff --git a/spectrum-network/src/protocol_handler/sigma_aggregation.rs b/spectrum-network/src/protocol_handler/sigma_aggregation.rs index 1706e94d..2e3f09a6 100644 --- a/spectrum-network/src/protocol_handler/sigma_aggregation.rs +++ b/spectrum-network/src/protocol_handler/sigma_aggregation.rs @@ -10,12 +10,11 @@ use futures::Stream; use higher::Bifunctor; use k256::{Scalar, SecretKey}; use libp2p::{Multiaddr, PeerId}; - use log::trace; + use spectrum_crypto::digest::Digest256; use spectrum_crypto::pubkey::PublicKey; -use crate::protocol::SIGMA_AGGR_PROTOCOL_ID; use crate::protocol_handler::aggregation::AggregationAction; use crate::protocol_handler::handel::partitioning::{MakePeerPartitions, PeerIx, PeerPartitions}; use crate::protocol_handler::handel::{Handel, HandelConfig, HandelRound}; @@ -33,7 +32,6 @@ use crate::protocol_handler::sigma_aggregation::types::{ use crate::protocol_handler::void::VoidMessage; use crate::protocol_handler::ProtocolBehaviour; use crate::protocol_handler::ProtocolBehaviourOut; -use crate::types::ProtocolId; mod crypto; mod message; diff --git a/spectrum-network/src/protocol_handler/sigma_aggregation/crypto.rs b/spectrum-network/src/protocol_handler/sigma_aggregation/crypto.rs index 27a7a411..5a660044 100644 --- a/spectrum-network/src/protocol_handler/sigma_aggregation/crypto.rs +++ b/spectrum-network/src/protocol_handler/sigma_aggregation/crypto.rs @@ -14,7 +14,6 @@ use k256::{ProjectivePoint, Scalar, SecretKey}; use spectrum_crypto::digest::{blake2b256_hash, Blake2bDigest256, Digest256}; use spectrum_crypto::pubkey::PublicKey; -use crate::protocol_handler::handel::partitioning::PeerIx; use crate::protocol_handler::handel::Threshold; use crate::protocol_handler::sigma_aggregation::types::{ AggregateCommitment, Commitment, CommitmentSecret, Signature, diff --git a/spectrum-network/src/protocol_handler/sigma_aggregation/message.rs b/spectrum-network/src/protocol_handler/sigma_aggregation/message.rs index a8d4ec9e..4f63c982 100644 --- a/spectrum-network/src/protocol_handler/sigma_aggregation/message.rs +++ b/spectrum-network/src/protocol_handler/sigma_aggregation/message.rs @@ -1,4 +1,3 @@ -use libp2p::PeerId; use serde::{Deserialize, Serialize}; use crate::protocol_handler::handel::message::HandelMessage; From 5805b1830d03c127f714eeec357f1b0cb81f3fa8 Mon Sep 17 00:00:00 2001 From: oskin1 Date: Fri, 9 Jun 2023 11:57:09 +0300 Subject: [PATCH 03/11] Fix ambiguity in naming. --- spectrum-network/src/protocol_handler/handel.rs | 11 +++++++---- 1 file changed, 7 insertions(+), 4 deletions(-) diff --git a/spectrum-network/src/protocol_handler/handel.rs b/spectrum-network/src/protocol_handler/handel.rs index 16b26fbd..657bd079 100644 --- a/spectrum-network/src/protocol_handler/handel.rs +++ b/spectrum-network/src/protocol_handler/handel.rs @@ -284,15 +284,18 @@ where partially, } => { // Recalculate score if needed. - let updated_score = if partially { contribution.weight() } else { score }; + let score = if partially { contribution.weight() } else { score }; let Verified(best_contrib) = &lvl.best_contribution; - if updated_score > best_contrib.score { + if score > best_contrib.score { trace!( "{:?} set NEW best contribution score: {}", self.own_peer_ix, - updated_score + score ); - lvl.best_contribution = Verified(ScoredContribution { score, contribution }); + lvl.best_contribution = Verified(ScoredContribution { + score, + contribution, + }); } self.scoring_window = self .scoring_window From 8f951da4d179e29777b9de53efd263f9b052210c Mon Sep 17 00:00:00 2001 From: oskin1 Date: Fri, 9 Jun 2023 12:01:45 +0300 Subject: [PATCH 04/11] Fix docs. --- spectrum-crypto/src/lib.rs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/spectrum-crypto/src/lib.rs b/spectrum-crypto/src/lib.rs index 45327c3b..c7171603 100644 --- a/spectrum-crypto/src/lib.rs +++ b/spectrum-crypto/src/lib.rs @@ -17,7 +17,7 @@ pub enum PVResult { pub trait VerifiableAgainst

: Sized { /// Verifies the statement. fn verify(&self, public_data: &P) -> bool; - /// Verifies the statement and returns `Some(valid_part)` in case partial verification succeded. - /// Returns `None` otherwise. + /// Verifies the statement and returns valid part in case partial verification succeded. + /// Returns `PVResult::Invalid` otherwise. fn verify_part(self, public_data: &P) -> PVResult; } From d8d1687857a6f045a4e6c2a8a9a1567d6b021e43 Mon Sep 17 00:00:00 2001 From: oskin1 Date: Fri, 9 Jun 2023 13:38:40 +0300 Subject: [PATCH 05/11] Use partial verification exclusively. --- spectrum-crypto/src/lib.rs | 4 +- .../src/protocol_handler/handel.rs | 44 +++++++++---------- .../sigma_aggregation/types.rs | 33 ++------------ 3 files changed, 26 insertions(+), 55 deletions(-) diff --git a/spectrum-crypto/src/lib.rs b/spectrum-crypto/src/lib.rs index c7171603..f8721d06 100644 --- a/spectrum-crypto/src/lib.rs +++ b/spectrum-crypto/src/lib.rs @@ -15,9 +15,7 @@ pub enum PVResult { /// Some statement which can be verified against public data `P`. pub trait VerifiableAgainst

: Sized { - /// Verifies the statement. - fn verify(&self, public_data: &P) -> bool; /// Verifies the statement and returns valid part in case partial verification succeded. /// Returns `PVResult::Invalid` otherwise. - fn verify_part(self, public_data: &P) -> PVResult; + fn verify(self, public_data: &P) -> PVResult; } diff --git a/spectrum-network/src/protocol_handler/handel.rs b/spectrum-network/src/protocol_handler/handel.rs index 657bd079..faa1101a 100644 --- a/spectrum-network/src/protocol_handler/handel.rs +++ b/spectrum-network/src/protocol_handler/handel.rs @@ -213,18 +213,21 @@ where while let Some(c) = lvl.prioritized_contributions.pop() { // Verify individual contribution first if let Some(ic) = c.individual_contribution { - if ic.verify(&self.public_data) { - lvl.individual_contributions.push(Verified(ic)); - } else { - // Ban peer, shrink scoring window, skip scoring and - // verification of aggregate contribution from this peer. - trace!("[Handel] run_aggr: {:?} BANNED", c.sender_id); - self.byzantine_nodes.insert(c.sender_id); - let shrinked_window = self - .scoring_window - .saturating_div(self.conf.window_shrinking_factor); - self.scoring_window = max(shrinked_window, 1); - continue; + match ic.verify(&self.public_data) { + PVResult::Invalid => { + // Ban peer, shrink scoring window, skip scoring and + // verification of aggregate contribution from this peer. + trace!("[Handel] run_aggr: {:?} BANNED", c.sender_id); + self.byzantine_nodes.insert(c.sender_id); + let shrinked_window = self + .scoring_window + .saturating_div(self.conf.window_shrinking_factor); + self.scoring_window = max(shrinked_window, 1); + continue; + } + PVResult::Valid { contribution: ic, .. } => { + lvl.individual_contributions.push(Verified(ic)); + } } } // Score aggregate contribution @@ -269,7 +272,7 @@ where contribution, } in scored_contributions.into_iter() { - match contribution.verify_part(&self.public_data) { + match contribution.verify(&self.public_data) { PVResult::Invalid => { // Ban peer, shrink scoring window. trace!("[Handel] run_aggr: {:?} BANNED", sender_id); @@ -292,10 +295,7 @@ where self.own_peer_ix, score ); - lvl.best_contribution = Verified(ScoredContribution { - score, - contribution, - }); + lvl.best_contribution = Verified(ScoredContribution { score, contribution }); } self.scoring_window = self .scoring_window @@ -771,11 +771,11 @@ mod tests { } impl VerifiableAgainst<()> for Contrib { - fn verify(&self, proposition: &()) -> bool { - true - } - fn verify_part(self, public_data: &()) -> PVResult { - PVResult::Valid(self) + fn verify(self, public_data: &()) -> PVResult { + PVResult::Valid { + contribution: self, + partially: false, + } } } diff --git a/spectrum-network/src/protocol_handler/sigma_aggregation/types.rs b/spectrum-network/src/protocol_handler/sigma_aggregation/types.rs index ba9140fe..e595b802 100644 --- a/spectrum-network/src/protocol_handler/sigma_aggregation/types.rs +++ b/spectrum-network/src/protocol_handler/sigma_aggregation/types.rs @@ -136,10 +136,7 @@ impl Weighted for Contributions { pub type PreCommitments = Contributions; impl VerifiableAgainst<()> for PreCommitments { - fn verify(&self, _: &()) -> bool { - true - } - fn verify_part(self, _: &()) -> PVResult { + fn verify(self, _: &()) -> PVResult { PVResult::Valid { contribution: self, partially: false, @@ -178,18 +175,7 @@ impl From for Vec { pub type CommitmentsWithProofs = Contributions<(Commitment, Signature)>; impl VerifiableAgainst for CommitmentsWithProofs { - fn verify(&self, public_data: &CommitmentsVerifInput) -> bool { - self.0.iter().all(|(i, (commitment, sig))| { - if let Some(pre_commitment) = public_data.pre_commitments.0.get(&i) { - let vk = VerifyingKey::from(commitment.clone()); - *pre_commitment == blake2b256_hash(&*commitment.as_bytes()) - && vk.verify(&public_data.message_digest_bytes, &sig.0).is_ok() - } else { - false - } - }) - } - fn verify_part(self, public_data: &CommitmentsVerifInput) -> PVResult { + fn verify(self, public_data: &CommitmentsVerifInput) -> PVResult { let contrib_len = self.0.len(); let mut aggr = HashMap::new(); let mut missing_parts = 0; @@ -257,20 +243,7 @@ struct ResponseVerifInput { } impl VerifiableAgainst for Responses { - fn verify(&self, public_data: &ResponsesVerifInput) -> bool { - let c = &public_data.challenge; - self.0.iter().all(|(k, zi)| { - public_data - .inputs - .get(&k) - .map(|input| { - let ai = &input.individual_input.into(); - verify_response(zi, ai, c, input.commitment.clone(), input.pk.clone()) - }) - .unwrap_or(false) - }) - } - fn verify_part(self, public_data: &ResponsesVerifInput) -> PVResult { + fn verify(self, public_data: &ResponsesVerifInput) -> PVResult { let c = &public_data.challenge; let contrib_len = self.0.len(); let mut aggr = HashMap::new(); From 4f70e7ebe590c22bfdca4b6d3fbcc30225926e80 Mon Sep 17 00:00:00 2001 From: oskin1 Date: Fri, 9 Jun 2023 17:32:21 +0300 Subject: [PATCH 06/11] Relax verification. --- .../src/protocol_handler/sigma_aggregation/types.rs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/spectrum-network/src/protocol_handler/sigma_aggregation/types.rs b/spectrum-network/src/protocol_handler/sigma_aggregation/types.rs index e595b802..bc4b69da 100644 --- a/spectrum-network/src/protocol_handler/sigma_aggregation/types.rs +++ b/spectrum-network/src/protocol_handler/sigma_aggregation/types.rs @@ -191,7 +191,7 @@ impl VerifiableAgainst for CommitmentsWithProofs { } else { missing_parts += 1; } - if missing_parts * 2 > contrib_len { + if contrib_len > 1 && missing_parts * 2 > contrib_len { // More than 50% of aggregate is invalid. return PVResult::Invalid; } @@ -259,7 +259,7 @@ impl VerifiableAgainst for Responses { } else { missing_parts += 1; } - if missing_parts * 2 > contrib_len { + if contrib_len > 1 && missing_parts * 2 > contrib_len { // More than 50% of aggregate is invalid. return PVResult::Invalid; } From 9eb566949ec98d4d6e35ca5d9a46b1d8f7014006 Mon Sep 17 00:00:00 2001 From: oskin1 Date: Fri, 9 Jun 2023 18:11:27 +0300 Subject: [PATCH 07/11] Weaken response verification. --- .../src/protocol_handler/sigma_aggregation/types.rs | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/spectrum-network/src/protocol_handler/sigma_aggregation/types.rs b/spectrum-network/src/protocol_handler/sigma_aggregation/types.rs index bc4b69da..8e61c479 100644 --- a/spectrum-network/src/protocol_handler/sigma_aggregation/types.rs +++ b/spectrum-network/src/protocol_handler/sigma_aggregation/types.rs @@ -252,10 +252,9 @@ impl VerifiableAgainst for Responses { if let Some(input) = public_data.inputs.get(&k) { let ai = &input.individual_input.into(); let verified = verify_response(&zi, ai, c, input.commitment.clone(), input.pk.clone()); - if !verified { - return PVResult::Invalid; + if verified { + aggr.insert(k, zi); } - aggr.insert(k, zi); } else { missing_parts += 1; } From b7ff35f387982a40a929f38b0da1ddab8b491221 Mon Sep 17 00:00:00 2001 From: Tim Ling <791016+kettlebell@users.noreply.github.com> Date: Wed, 14 Jun 2023 20:42:36 +1000 Subject: [PATCH 08/11] Update test for Byzantine nodes --- .../src/protocol_handler/handel.rs | 40 ++++++--- .../src/protocol_handler/sigma_aggregation.rs | 37 +++++++- .../sigma_aggregation/types.rs | 6 ++ .../tests/integration_tests/aggregation.rs | 87 ++++++++++--------- .../tests/integration_tests/mod.rs | 80 +++++++++++++++-- 5 files changed, 186 insertions(+), 64 deletions(-) diff --git a/spectrum-network/src/protocol_handler/handel.rs b/spectrum-network/src/protocol_handler/handel.rs index faa1101a..9921f5a1 100644 --- a/spectrum-network/src/protocol_handler/handel.rs +++ b/spectrum-network/src/protocol_handler/handel.rs @@ -217,7 +217,7 @@ where PVResult::Invalid => { // Ban peer, shrink scoring window, skip scoring and // verification of aggregate contribution from this peer. - trace!("[Handel] run_aggr: {:?} BANNED", c.sender_id); + trace!("[Handel] run_aggr(indiv): {:?} BANNED", c.sender_id); self.byzantine_nodes.insert(c.sender_id); let shrinked_window = self .scoring_window @@ -238,7 +238,8 @@ where Some(aggr) => { let score = aggr.weight(); trace!( - "{:?} successful contribution (weight: {} ", + "{:?} successful contribution {:?} (weight: {} ", + aggr, self.own_peer_ix, score ); @@ -275,7 +276,7 @@ where match contribution.verify(&self.public_data) { PVResult::Invalid => { // Ban peer, shrink scoring window. - trace!("[Handel] run_aggr: {:?} BANNED", sender_id); + trace!("[Handel] run_aggr (aggr): {:?} BANNED", sender_id); self.byzantine_nodes.insert(sender_id); let shrinked_window = self .scoring_window @@ -287,13 +288,15 @@ where partially, } => { // Recalculate score if needed. - let score = if partially { contribution.weight() } else { score }; + let score = contribution.weight(); let Verified(best_contrib) = &lvl.best_contribution; if score > best_contrib.score { trace!( - "{:?} set NEW best contribution score: {}", + "{:?}: set NEW best contribution {:?}, score: {}, partially: {}", self.own_peer_ix, - score + contribution, + score, + partially ); lvl.best_contribution = Verified(ScoredContribution { score, contribution }); } @@ -304,9 +307,17 @@ where } } let Verified(best_contrib) = &lvl.best_contribution; + let score = best_contrib.contribution.weight(); + let best_contrib = best_contrib.clone(); if is_complete(&best_contrib.contribution, level, self.conf.threshold) { lvl.completed(); - trace!("{:?}: RFP @ level {}", self.own_peer_ix, level); + trace!( + "{:?}: RFP @ level {}, contribution: {:?}, score: {}", + self.own_peer_ix, + level, + best_contrib, + score, + ); self.run_fast_path(level); self.try_activate_level(level + 1); } @@ -321,6 +332,7 @@ where let peers_at_level = self.peer_partitions.peers_at_level(level, PeerOrd::VP); if peers_at_level.is_empty() { // This level is empty, skip it + trace!("Level {} is empty! peers_at_level(VP): {:?}******************************************************", level, peers_at_level); self.levels[level] = Some(ActiveLevel::unit(prev_level.best_contribution.clone())); self.try_activate_level(level + 1); } else { @@ -667,10 +679,11 @@ where .is_ok() { trace!( - "[Handel] {:?}: contribution from {:?} @ level {}", + "[Handel] {:?}: contribution from {:?} @ level {}, receiver_level_complete: {}", self.own_peer_ix, self.peer_partitions.try_index_peer(peer_id).unwrap(), - msg.level + msg.level, + !msg.contact_sender, ); self.run_aggregation(msg.level as usize); } @@ -746,6 +759,7 @@ mod tests { use libp2p::{Multiaddr, PeerId}; use algebra_core::CommutativePartialSemigroup; + use log::trace; use spectrum_crypto::{PVResult, VerifiableAgainst}; use crate::protocol_handler::handel::partitioning::tests::FakePartitions; @@ -917,7 +931,7 @@ mod tests { let handel = make_handel(own_peer, peers.clone(), my_contrib.clone()); let own_peer_ix = handel.peer_partitions.try_index_peer(own_peer).unwrap(); - println!("Partition for {:?}------------------------", own_peer_ix); + trace!("Partition for {:?}------------------------", own_peer_ix); for level in 0..handel.peer_partitions.num_levels() { dbg!((level, &handel.peer_partitions.peers_at_level(level, PeerOrd::VP))); } @@ -928,7 +942,7 @@ mod tests { let mut num_messages_sent = 0; // run dissemination loop { - println!("PASS {} ****************************************", counter); + trace!("PASS {} ****************************************", counter); let mut messages = vec![]; for i in 0..nodes.len() { let (from_peer_id, handel) = nodes.get_mut(i).unwrap(); @@ -974,9 +988,9 @@ mod tests { for (_, handel) in nodes { let result = handel.get_complete_aggregate().unwrap(); - println!("{:?} contribution: {:?}", handel.own_peer_ix, result); + trace!("{:?} contribution: {:?}", handel.own_peer_ix, result); } - println!("PASSED. # messages sent: {}", num_messages_sent); + trace!("PASSED. # messages sent: {}", num_messages_sent); } } diff --git a/spectrum-network/src/protocol_handler/sigma_aggregation.rs b/spectrum-network/src/protocol_handler/sigma_aggregation.rs index 2e3f09a6..e4a80b48 100644 --- a/spectrum-network/src/protocol_handler/sigma_aggregation.rs +++ b/spectrum-network/src/protocol_handler/sigma_aggregation.rs @@ -33,6 +33,8 @@ use crate::protocol_handler::void::VoidMessage; use crate::protocol_handler::ProtocolBehaviour; use crate::protocol_handler::ProtocolBehaviourOut; +use super::handel::Threshold; + mod crypto; mod message; pub mod types; @@ -119,12 +121,13 @@ where fn complete( self, pre_commitments: PreCommitments, - handel_conf: HandelConfig, + mut handel_conf: HandelConfig, ) -> AggregateSchnorrCommitments<'a, H, PP> { let verif_input = CommitmentsVerifInput { pre_commitments, message_digest_bytes: self.message_digest.as_ref().to_vec(), }; + handel_conf.threshold = Threshold { num: 2, denom: 3 }; AggregateSchnorrCommitments { host_sk: self.host_sk, host_ix: self.host_ix, @@ -172,7 +175,7 @@ where fn complete( self, commitments_with_proofs: CommitmentsWithProofs, - handel_conf: HandelConfig, + mut handel_conf: HandelConfig, ) -> AggregateResponses<'a, H, PP> { // Need to ensure stable ordering for committee and individual inputs. Just sort by PeerIx. let mut committee = self.committee.clone().into_iter().collect::>(); @@ -205,6 +208,7 @@ where self.individual_inputs.clone(), challenge, ); + handel_conf.threshold = Threshold { num: 2, denom: 3 }; AggregateResponses { host_sk: self.host_sk, host_ix: self.host_ix, @@ -423,6 +427,19 @@ where continue; } Either::Right(pre_commitments) => { + let mut missing_peers: Vec<_> = (0_usize..16).collect(); + let peers = pre_commitments + .entries() + .into_iter() + .map(|(key, _)| key.unwrap()) + .collect::>(); + for i in peers { + if let Some(ix) = missing_peers.iter().position(|j| *j == i) { + missing_peers.remove(ix); + } + } + missing_peers.sort(); + println!("{:?}: PreComm missing: {:?}", st.host_ix, missing_peers); let host_ix = st.host_ix; self.task = Some(AggregationTask { state: AggregationState::AggregateSchnorrCommitments( @@ -459,6 +476,19 @@ where Either::Right(commitments) => { trace!("[SA] Got commitments"); + let mut missing_peers: Vec<_> = (0_usize..16).collect(); + let peers = commitments + .entries() + .into_iter() + .map(|(key, _)| key.unwrap()) + .collect::>(); + for i in peers { + if let Some(ix) = missing_peers.iter().position(|j| *j == i) { + missing_peers.remove(ix); + } + } + missing_peers.sort(); + println!("{:?}: Comm missing: {:?}", st.host_ix, missing_peers); self.task = Some(AggregationTask { state: AggregationState::AggregateResponses( st.complete(commitments, self.handel_conf), @@ -493,9 +523,10 @@ where } Either::Right(responses) => { self.task = None; + let host_ix = st.host_ix; let res = st.complete(responses); // todo: support error case. - trace!("[SA] Got responses: {:?}", res); + trace!("{:?}: [SA] Got responses: {:?}", host_ix, res); if channel.send(Ok(res)).is_err() { // warn here. } diff --git a/spectrum-network/src/protocol_handler/sigma_aggregation/types.rs b/spectrum-network/src/protocol_handler/sigma_aggregation/types.rs index 8e61c479..a7999cf6 100644 --- a/spectrum-network/src/protocol_handler/sigma_aggregation/types.rs +++ b/spectrum-network/src/protocol_handler/sigma_aggregation/types.rs @@ -7,6 +7,7 @@ use k256::elliptic_curve::sec1::ToEncodedPoint; use k256::schnorr::signature::*; use k256::schnorr::VerifyingKey; use k256::{ProjectivePoint, Scalar, SecretKey}; +use log::trace; use serde::{Deserialize, Serialize}; use algebra_core::CommutativePartialSemigroup; @@ -254,6 +255,11 @@ impl VerifiableAgainst for Responses { let verified = verify_response(&zi, ai, c, input.commitment.clone(), input.pk.clone()); if verified { aggr.insert(k, zi); + } else { + trace!( + "Failed to verify contribution from {:?} **************************8", + k + ); } } else { missing_parts += 1; diff --git a/spectrum-network/tests/integration_tests/aggregation.rs b/spectrum-network/tests/integration_tests/aggregation.rs index 576d2864..be5b2154 100644 --- a/spectrum-network/tests/integration_tests/aggregation.rs +++ b/spectrum-network/tests/integration_tests/aggregation.rs @@ -27,27 +27,25 @@ use spectrum_network::protocol::{ use spectrum_network::protocol_api::ProtocolMailbox; use spectrum_network::protocol_handler::aggregation::AggregationAction; use spectrum_network::protocol_handler::handel::partitioning::{ - MakeBinomialPeerPartitions, PseudoRandomGenPerm, + MakeBinomialPeerPartitions, PeerIx, PseudoRandomGenPerm, }; use spectrum_network::protocol_handler::handel::{HandelConfig, Threshold}; -use spectrum_network::protocol_handler::sigma_aggregation::{SigmaAggregation}; +use spectrum_network::protocol_handler::sigma_aggregation::SigmaAggregation; use spectrum_network::protocol_handler::ProtocolHandler; use spectrum_network::types::{ProtocolVer, Reputation}; -pub struct Peer { - pub peer_id: PeerId, - pub peer_addr: Multiaddr, - pub peer_pk: k256::PublicKey, - pub peer_handle: AbortHandle, - pub aggr_handler_mailbox: Sender>, -} - pub fn k256_to_libsecp256k1(secret_key: SecretKey) -> identity::secp256k1::SecretKey { identity::secp256k1::SecretKey::try_from_bytes(secret_key.to_bytes().as_mut_slice()).unwrap() } -pub fn setup_nodes(n: usize) -> Vec { +pub fn setup_nodes<'de>( + n: usize, + threshold: Threshold, +) -> (Vec>, MakeBinomialPeerPartitions) { let mut rng = OsRng; + let seed = [0_u8; 32]; + let gen_perm = PseudoRandomGenPerm::new(seed); + let gen_perm_cloned = gen_perm.clone(); let mut spawn_node = move |node_ix| { let peer_sk = SecretKey::random(&mut rng); let peer_key = identity::Keypair::from(identity::secp256k1::Keypair::from(k256_to_libsecp256k1( @@ -95,16 +93,14 @@ pub fn setup_nodes(n: usize) -> Vec { peer_manager_msg_buffer_size: 1000, }; let handel_conf = HandelConfig { - threshold: Threshold { num: 8, denom: 8 }, + threshold, window_shrinking_factor: 4, initial_scoring_window: 3, fast_path_window: 10, dissemination_interval: Duration::from_millis(40), level_activation_delay: Duration::from_millis(50), - poll_fn_delay: Duration::from_millis(5), + poll_fn_delay: Duration::from_millis(2), }; - let seed = [0_u8; 32]; - let gen_perm = PseudoRandomGenPerm::new(seed); let (aggr_handler_snd, aggr_handler_inbox) = mpsc::channel::>(100); let sig_aggr = SigmaAggregation::new( peer_sk.clone(), @@ -120,42 +116,53 @@ pub fn setup_nodes(n: usize) -> Vec { let network_api = NetworkMailbox { mailbox_snd: requests_snd, }; - let (mut aggr_handler, aggr_mailbox) = ProtocolHandler::new(sig_aggr, network_api, SIGMA_AGGR_PROTOCOL_ID, 10); - let nc = NetworkController::new( - peer_conn_handler_conf, - HashMap::from([( - SIGMA_AGGR_PROTOCOL_ID, - (ProtocolConfig::OneShot(one_shot_proto_conf.clone()), aggr_mailbox), - )]), - peers, - peer_manager, - requests_recv, - ); - let (abortable_peer, handle) = - futures::future::abortable(create_swarm(peer_key.clone(), nc, peer_addr.clone(), node_ix)); - tokio::task::spawn(async move { - println!("PEER:{} :: spawning protocol handler..", node_ix); - loop { - aggr_handler.select_next_some().await; - } - }); - tokio::task::spawn(async move { - println!("PEER:{} :: spawning peer..", node_ix); - abortable_peer.await - }); + let (aggr_handler, aggr_mailbox): ( + ProtocolHandler< + SigmaAggregation>, + NetworkMailbox, + >, + _, + ) = ProtocolHandler::new(sig_aggr, network_api, SIGMA_AGGR_PROTOCOL_ID, 10); + let nc: NetworkController, ProtocolMailbox> = + NetworkController::new( + peer_conn_handler_conf, + HashMap::from([( + SIGMA_AGGR_PROTOCOL_ID, + (ProtocolConfig::OneShot(one_shot_proto_conf.clone()), aggr_mailbox), + )]), + peers, + peer_manager, + requests_recv, + ); Peer { peer_id, peer_addr, peer_pk: peer_sk.public_key(), - peer_handle: handle, + peer_sk, aggr_handler_mailbox: aggr_handler_snd, + aggr_handler, + nc, } }; let mut nodes = vec![]; for i in 0..n { nodes.push(spawn_node(i)); } - nodes + (nodes, MakeBinomialPeerPartitions { rng: gen_perm_cloned }) +} + +pub struct Peer<'de> { + pub peer_id: PeerId, + pub peer_addr: Multiaddr, + pub peer_pk: k256::PublicKey, + pub peer_sk: SecretKey, + pub aggr_handler_mailbox: Sender>, + pub aggr_handler: ProtocolHandler< + 'de, + SigmaAggregation<'de, Blake2b, MakeBinomialPeerPartitions>, + NetworkMailbox, + >, + pub nc: NetworkController, ProtocolMailbox>, } pub async fn create_swarm( diff --git a/spectrum-network/tests/integration_tests/mod.rs b/spectrum-network/tests/integration_tests/mod.rs index 388faa1f..89c68eda 100644 --- a/spectrum-network/tests/integration_tests/mod.rs +++ b/spectrum-network/tests/integration_tests/mod.rs @@ -20,6 +20,8 @@ use spectrum_crypto::pubkey::PublicKey; use spectrum_network::protocol::{OneShotProtocolConfig, OneShotProtocolSpec, ProtocolConfig}; use spectrum_network::protocol_api::ProtocolEvent; use spectrum_network::protocol_handler::aggregation::AggregationAction; +use spectrum_network::protocol_handler::handel::partitioning::{MakePeerPartitions, PeerIx, PeerPartitions}; +use spectrum_network::protocol_handler::handel::Threshold; use spectrum_network::types::{ProtocolTag, RawMessage}; use spectrum_network::{ network_controller::{NetworkController, NetworkControllerIn, NetworkControllerOut, NetworkMailbox}, @@ -970,9 +972,16 @@ async fn create_swarm

( #[cfg_attr(feature = "test_peer_punish_too_slow", ignore)] #[tokio::test] -async fn sigma_aggregation_normal() { +async fn sigma_aggregation_byzantine() { //init_logging_once_for(vec![], LevelFilter::Debug, None); - let mut peers = aggregation::setup_nodes(8); + let byzantine_nodes = vec![ + PeerIx::from(0), + PeerIx::from(15), + PeerIx::from(12), + PeerIx::from(7), + ]; + + let (peers, partitioner) = aggregation::setup_nodes(16, Threshold { num: 12, denom: 16 }); let md = blake2b256_hash(b"foo"); let committee: HashMap> = peers .iter() @@ -982,13 +991,56 @@ async fn sigma_aggregation_normal() { }| ((*peer_pk).into(), Some(peer_addr.clone())), ) .collect(); + let peers_and_addr: Vec<_> = committee + .iter() + .map(|(pk, addr)| (PeerId::from(pk.clone()), addr.clone())) + .collect(); + + let mut aggr_handler_mailboxes = vec![]; + let mut abort_handles = vec![]; + for (node_ix, mut peer) in peers.into_iter().enumerate() { + let peer_key = identity::Keypair::from(identity::secp256k1::Keypair::from( + aggregation::k256_to_libsecp256k1(peer.peer_sk.clone()), + )); + let (abortable_peer, handle) = futures::future::abortable(aggregation::create_swarm( + peer_key.clone(), + peer.nc, + peer.peer_addr.clone(), + node_ix, + )); + + let pk: PublicKey = peer.peer_pk.into(); + let host_pid = PeerId::from(pk); + let partitions = partitioner.make(host_pid, peers_and_addr.clone()); + let host_ix = partitions.try_index_peer(host_pid).unwrap(); + + if byzantine_nodes.contains(&host_ix) { + println!( + "PEER:{} IS BYZANTINE ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^", + node_ix + ); + } else { + abort_handles.push(handle); + aggr_handler_mailboxes.push(peer.aggr_handler_mailbox); + tokio::task::spawn(async move { + println!("PEER:{} :: spawning protocol handler..", node_ix); + loop { + peer.aggr_handler.select_next_some().await; + } + }); + tokio::task::spawn(async move { + println!("PEER:{} :: spawning peer..", node_ix); + abortable_peer.await + }); + } + } wasm_timer::Delay::new(Duration::from_millis(100)).await.unwrap(); let mut result_futures = Vec::new(); - for peer in peers.iter_mut() { + for aggr_handler_mailbox in aggr_handler_mailboxes { let (snd, recv) = oneshot::channel(); result_futures.push(recv); - async_std::task::block_on(peer.aggr_handler_mailbox.clone().send(AggregationAction::Reset { + async_std::task::block_on(aggr_handler_mailbox.clone().send(AggregationAction::Reset { new_committee: committee.clone(), new_message: md, channel: snd, @@ -998,23 +1050,35 @@ async fn sigma_aggregation_normal() { let started_at = Instant::now(); + let num_finished = std::sync::Arc::new(futures::lock::Mutex::new(0)); for (ix, fut) in result_futures.into_iter().enumerate() { + let num_finished = num_finished.clone(); async_std::task::spawn(async move { let res = fut.await; let finished_at = Instant::now(); let elapsed = finished_at.sub(started_at); match res { - Ok(_) => println!("PEER:{} :: Finished aggr in {} millis", ix, elapsed.as_millis()), + Ok(_) => { + *num_finished.lock().await += 1; + println!("PEER:{} :: Finished aggr in {} millis", ix, elapsed.as_millis()) + } Err(_) => println!("PEER:{} :: Failed aggr in {} millis", ix, elapsed.as_millis()), } }); } - wasm_timer::Delay::new(Duration::from_secs(2)).await.unwrap(); + wasm_timer::Delay::new(Duration::from_secs(10)).await.unwrap(); - for peer in &peers { - peer.peer_handle.abort(); + for abort_handle in abort_handles { + abort_handle.abort(); } + + let num_finished = *num_finished.lock().await; + println!( + "num_finished aggr: {}, {}%", + num_finished, + (num_finished as f64) / (16.0) * 100.0 + ); } fn make_swarm_components( From 6b16b328bd6458a4fa58c8ee207fde24792bcf63 Mon Sep 17 00:00:00 2001 From: Tim Ling <791016+kettlebell@users.noreply.github.com> Date: Thu, 15 Jun 2023 18:24:11 +1000 Subject: [PATCH 09/11] Docker implementation --- Dockerfile | 16 ++ .../tests/integration_tests/aggregation.rs | 28 ++- .../tests/integration_tests/mod.rs | 5 + spectrum-node/src/main.rs | 204 ++++++++++++++++-- 4 files changed, 238 insertions(+), 15 deletions(-) create mode 100644 Dockerfile diff --git a/Dockerfile b/Dockerfile new file mode 100644 index 00000000..a7ca7c0b --- /dev/null +++ b/Dockerfile @@ -0,0 +1,16 @@ +FROM rust:slim AS builder + +RUN apt update && apt-get install -y clang +RUN update-ca-certificates + +WORKDIR /usr/src/app + +# copy entire workspace +COPY . . + +RUN cargo build --release + + +FROM debian:bullseye-slim +COPY --from=builder /usr/src/app/target/release/spectrum-node ./ +CMD [ "./spectrum-node" ] \ No newline at end of file diff --git a/spectrum-network/tests/integration_tests/aggregation.rs b/spectrum-network/tests/integration_tests/aggregation.rs index be5b2154..66712d96 100644 --- a/spectrum-network/tests/integration_tests/aggregation.rs +++ b/spectrum-network/tests/integration_tests/aggregation.rs @@ -208,7 +208,7 @@ pub struct IndividualPeerInfo { } /// Used for Docker testing. -fn generate_peer_info_files(num_nodes: usize) { +pub fn generate_peer_info_files(num_nodes: usize) { let mut rng = OsRng; let mut committee = HashMap::default(); let mut individual_peer_info = vec![]; @@ -229,6 +229,8 @@ fn generate_peer_info_files(num_nodes: usize) { individual_peer_info.push(peer_info); } + let mut docker_compose_str = String::from("services:\n"); + for (node_ix, info) in individual_peer_info.into_iter().enumerate() { let peer_info = PeerInfo { peer_id: info.peer_id, @@ -236,6 +238,18 @@ fn generate_peer_info_files(num_nodes: usize) { peer_sk_base_16: info.peer_sk_base_16, committee: committee.clone(), }; + docker_compose_str += &format!(" node{}:\n", node_ix); + docker_compose_str += " image: spectrum\n"; + docker_compose_str += " volumes:\n"; + docker_compose_str += " - type: bind\n"; + docker_compose_str += " source: ${PWD}/conf/log4rs.yaml\n"; + docker_compose_str += " target: /conf/log4rs.yaml\n"; + docker_compose_str += " - type: bind\n"; + docker_compose_str += &format!(" source: ${{PWD}}/conf/peer_input_{}.yml\n", node_ix); + docker_compose_str += " target: /conf/peer_info.yml\n"; + docker_compose_str += " networks:\n"; + docker_compose_str += " network:\n"; + docker_compose_str += &format!(" ipv4_address: 172.18.12.{}\n", node_ix); let yaml_string = serde_yaml::to_string(&peer_info).unwrap(); let mut file = std::fs::File::create(format!("peer_input_{}.yml", node_ix)).unwrap(); file.write_all(yaml_string.as_bytes()).unwrap(); @@ -243,4 +257,16 @@ fn generate_peer_info_files(num_nodes: usize) { // Flush the contents to ensure all data is written file.flush().unwrap(); } + docker_compose_str += "networks:\n"; + docker_compose_str += " network:\n"; + docker_compose_str += " driver: bridge\n"; + docker_compose_str += " ipam:\n"; + docker_compose_str += " driver: default\n"; + docker_compose_str += " config:\n"; + docker_compose_str += " - subnet: 172.18.12.0/16\n"; + let mut file = std::fs::File::create("docker-compose.yml").unwrap(); + file.write_all(docker_compose_str.as_bytes()).unwrap(); + + // Flush the contents to ensure all data is written + file.flush().unwrap(); } diff --git a/spectrum-network/tests/integration_tests/mod.rs b/spectrum-network/tests/integration_tests/mod.rs index 89c68eda..16007495 100644 --- a/spectrum-network/tests/integration_tests/mod.rs +++ b/spectrum-network/tests/integration_tests/mod.rs @@ -969,6 +969,11 @@ async fn create_swarm

( } } } +//#[cfg_attr(feature = "test_peer_punish_too_slow", ignore)] +//#[tokio::test] +//async fn gen_docker_files() { +// aggregation::generate_peer_info_files(64); +//} #[cfg_attr(feature = "test_peer_punish_too_slow", ignore)] #[tokio::test] diff --git a/spectrum-node/src/main.rs b/spectrum-node/src/main.rs index 4f7a06f1..f3c1eb16 100644 --- a/spectrum-node/src/main.rs +++ b/spectrum-node/src/main.rs @@ -1,32 +1,52 @@ use std::collections::HashMap; use std::error::Error; +use std::ops::Sub; use std::str::FromStr; -use std::time::Duration; +use std::time::{Duration, Instant}; +use async_std::fs::File; use futures::channel::mpsc; use futures::prelude::*; -use libp2p::identity; +use libp2p::core::upgrade::Version; +use libp2p::identity::{self, Keypair}; use libp2p::swarm::{SwarmBuilder, SwarmEvent}; -use libp2p::Multiaddr; -use libp2p::PeerId; +use libp2p::{noise, tcp, yamux, Multiaddr}; +use libp2p::{PeerId, Transport}; +use log::{error, info}; +use serde::{Deserialize, Serialize}; +use spectrum_crypto::digest::{blake2b256_hash, Blake2b}; +use spectrum_crypto::pubkey::PublicKey; use spectrum_network::network_controller::{NetworkController, NetworkControllerIn, NetworkMailbox}; use spectrum_network::peer_conn_handler::PeerConnHandlerConf; use spectrum_network::peer_manager::data::PeerDestination; use spectrum_network::peer_manager::peers_state::PeerRepo; -use spectrum_network::peer_manager::{NetworkingConfig, PeerManager, PeerManagerConfig}; -use spectrum_network::protocol::{DIFFUSION_PROTOCOL_ID, ProtocolConfig, StatefulProtocolConfig, StatefulProtocolSpec}; -use spectrum_network::protocol_handler::discovery::message::{ - DiscoveryMessage, DiscoveryMessageV1, DiscoverySpec, +use spectrum_network::peer_manager::{NetworkingConfig, PeerManager, PeerManagerConfig, PeersMailbox}; +use spectrum_network::protocol::{ + OneShotProtocolConfig, OneShotProtocolSpec, ProtocolConfig, StatefulProtocolConfig, StatefulProtocolSpec, + DISCOVERY_PROTOCOL_ID, SIGMA_AGGR_PROTOCOL_ID, }; +use spectrum_network::protocol_api::ProtocolMailbox; +use spectrum_network::protocol_handler::aggregation::AggregationAction; +use spectrum_network::protocol_handler::discovery::message::DiscoverySpec; use spectrum_network::protocol_handler::discovery::{DiscoveryBehaviour, NodeStatus}; +use spectrum_network::protocol_handler::handel::partitioning::{ + MakeBinomialPeerPartitions, PseudoRandomGenPerm, +}; +use spectrum_network::protocol_handler::handel::{HandelConfig, Threshold}; +use spectrum_network::protocol_handler::sigma_aggregation::SigmaAggregation; use spectrum_network::protocol_handler::ProtocolHandler; -use spectrum_network::types::Reputation; +use spectrum_network::types::{ProtocolVer, Reputation}; -#[async_std::main] +#[tokio::main] async fn main() -> Result<(), Box> { log4rs::init_file("conf/log4rs.yaml", Default::default()).unwrap(); + run_sigma_aggregation().await; + Ok(()) +} + +async fn run_sync_protocol() -> Result<(), Box> { let local_key = identity::Keypair::generate_ed25519(); let local_peer_id = PeerId::from(local_key.public()); println!("Local peer id: {:?}", local_peer_id); @@ -88,7 +108,7 @@ async fn main() -> Result<(), Box> { }; let local_status = NodeStatus { - supported_protocols: Vec::from([DIFFUSION_PROTOCOL_ID]), + supported_protocols: Vec::from([DISCOVERY_PROTOCOL_ID]), height: 0, }; let sync_behaviour = DiscoveryBehaviour::new(peers.clone(), local_status); @@ -98,12 +118,16 @@ async fn main() -> Result<(), Box> { mailbox_snd: requests_snd, }; const PH_MSG_BUFFER_SIZE: usize = 10; - let (mut sync_handler, sync_mailbox) = - ProtocolHandler::new(sync_behaviour, network_api, DIFFUSION_PROTOCOL_ID, PH_MSG_BUFFER_SIZE); + let (mut sync_handler, sync_mailbox) = ProtocolHandler::new( + sync_behaviour, + network_api, + DISCOVERY_PROTOCOL_ID, + PH_MSG_BUFFER_SIZE, + ); let nc = NetworkController::new( peer_conn_handler_conf, HashMap::from([( - sync_handler.protocol, + DISCOVERY_PROTOCOL_ID, (ProtocolConfig::Stateful(sync_conf), sync_mailbox), )]), peers, @@ -130,3 +154,155 @@ async fn main() -> Result<(), Box> { } } } + +async fn run_sigma_aggregation() { + let one_shot_proto_conf = OneShotProtocolConfig { + version: ProtocolVer::default(), + spec: OneShotProtocolSpec { + max_message_size: 5000, + }, + }; + let peer_conn_handler_conf = PeerConnHandlerConf { + async_msg_buffer_size: 100, + sync_msg_buffer_size: 100, + open_timeout: Duration::from_secs(60), + initial_keep_alive: Duration::from_secs(120), + }; + let netw_config = NetworkingConfig { + min_known_peers: 1, + min_outbound: 1, + max_inbound: 10, + max_outbound: 20, + }; + let peer_manager_conf = PeerManagerConfig { + min_acceptable_reputation: Reputation::from(-50), + min_reputation: Reputation::from(-20), + conn_reset_outbound_backoff: Duration::from_secs(120), + conn_alloc_interval: Duration::from_secs(30), + prot_alloc_interval: Duration::from_secs(30), + protocols_allocation: Vec::new(), + peer_manager_msg_buffer_size: 1000, + }; + let handel_conf = HandelConfig { + threshold: Threshold { num: 8, denom: 8 }, + window_shrinking_factor: 4, + initial_scoring_window: 3, + fast_path_window: 10, + dissemination_interval: Duration::from_millis(100), + level_activation_delay: Duration::from_millis(50), + poll_fn_delay: Duration::from_millis(5), + }; + + let mut file = File::open("conf/peer_info.yml").await.unwrap(); + let mut yaml_string = String::new(); + file.read_to_string(&mut yaml_string).await.unwrap(); + + let PeerInfo { + peer_id, + peer_addr, + peer_sk_base_16, + committee, + } = serde_yaml::from_str(&yaml_string).unwrap(); + + let peer_sk_bytes = base16::decode(&peer_sk_base_16).unwrap(); + let peer_sk = k256::SecretKey::from_slice(&peer_sk_bytes).unwrap(); + + let peer_key = identity::Keypair::from(identity::secp256k1::Keypair::from(k256_to_libsecp256k1( + peer_sk.clone(), + ))); + + let seed = [0_u8; 32]; + let gen_perm = PseudoRandomGenPerm::new(seed); + + let (mut aggr_handler_snd, aggr_handler_inbox) = mpsc::channel::>(100); + let sig_aggr = SigmaAggregation::new( + peer_sk, + handel_conf, + MakeBinomialPeerPartitions { rng: gen_perm }, + aggr_handler_inbox, + ); + let peer_state = PeerRepo::new(netw_config, vec![]); + let (peer_manager, peers) = PeerManager::new(peer_state, peer_manager_conf); + let (requests_snd, requests_recv) = mpsc::channel::(100); + let network_api = NetworkMailbox { + mailbox_snd: requests_snd, + }; + let (mut aggr_handler, aggr_mailbox) = + ProtocolHandler::new(sig_aggr, network_api, SIGMA_AGGR_PROTOCOL_ID, 100); + let nc = NetworkController::new( + peer_conn_handler_conf, + HashMap::from([( + SIGMA_AGGR_PROTOCOL_ID, + (ProtocolConfig::OneShot(one_shot_proto_conf.clone()), aggr_mailbox), + )]), + peers, + peer_manager, + requests_recv, + ); + tokio::task::spawn(async move { + info!("spawning protocol handler"); + loop { + aggr_handler.select_next_some().await; + } + }); + + let md = blake2b256_hash(b"foo"); + let (snd, recv) = futures::channel::oneshot::channel(); + + aggr_handler_snd + .send(AggregationAction::Reset { + new_committee: committee.clone(), + new_message: md, + channel: snd, + }) + .await + .unwrap(); + + let started_at = Instant::now(); + async_std::task::spawn(async move { + let res = recv.await; + let finished_at = Instant::now(); + let elapsed = finished_at.sub(started_at); + match res { + Ok(_) => info!("Finished aggr in {} millis", elapsed.as_millis()), + Err(_) => error!("Failed aggr in {} millis", elapsed.as_millis()), + } + }); + + info!("spawning peer"); + create_swarm(peer_key.clone(), nc).await +} + +pub async fn create_swarm( + local_key: Keypair, + nc: NetworkController, ProtocolMailbox>, +) { + let transport = tcp::async_io::Transport::default() + .upgrade(Version::V1Lazy) + .authenticate(noise::Config::new(&local_key).unwrap()) // todo: avoid auth + .multiplex(yamux::Config::default()) + .boxed(); + let local_peer_id = PeerId::from(local_key.public()); + let mut swarm = SwarmBuilder::with_async_std_executor(transport, nc, local_peer_id).build(); + + swarm.listen_on("/ip4/0.0.0.0/tcp/8000".parse().unwrap()).unwrap(); + + loop { + match swarm.select_next_some().await { + SwarmEvent::NewListenAddr { address, .. } => info!("Listening on {:?}", address), + ce => {} + } + } +} + +#[derive(Serialize, Deserialize)] +pub struct PeerInfo { + pub peer_id: PeerId, + pub peer_addr: Multiaddr, + pub peer_sk_base_16: String, + pub committee: HashMap>, +} + +pub fn k256_to_libsecp256k1(secret_key: k256::SecretKey) -> identity::secp256k1::SecretKey { + identity::secp256k1::SecretKey::try_from_bytes(secret_key.to_bytes().as_mut_slice()).unwrap() +} From 2f2d8ed13f6fdbd44baeb9b78fd60ce436dec02f Mon Sep 17 00:00:00 2001 From: oskin1 Date: Thu, 15 Jun 2023 20:11:03 +0300 Subject: [PATCH 10/11] Update ciborium. --- spectrum-network/Cargo.toml | 2 +- spectrum-network/src/protocol_handler.rs | 64 +++++++++---------- .../src/protocol_handler/codec.rs | 2 +- .../src/protocol_handler/diffusion.rs | 2 +- .../src/protocol_handler/diffusion/message.rs | 2 +- .../src/protocol_handler/discovery.rs | 2 +- .../src/protocol_handler/discovery/message.rs | 2 +- .../src/protocol_handler/sigma_aggregation.rs | 2 +- .../sigma_aggregation/message.rs | 2 +- 9 files changed, 39 insertions(+), 41 deletions(-) diff --git a/spectrum-network/Cargo.toml b/spectrum-network/Cargo.toml index 4e80bb72..98612f0b 100644 --- a/spectrum-network/Cargo.toml +++ b/spectrum-network/Cargo.toml @@ -27,7 +27,7 @@ rand = "0.8.5" rand_chacha = "0.3.1" wasm-timer = "0.2.5" serde = { version = "1.0.147", features = ["derive"] } -ciborium = "0.2.0" +ciborium = "0.2.1" smallvec = "1.10.0" derive_more = "0.99.17" either = { version = "1.8.1", features = ["serde"] } diff --git a/spectrum-network/src/protocol_handler.rs b/spectrum-network/src/protocol_handler.rs index 696753a2..e46fefa3 100644 --- a/spectrum-network/src/protocol_handler.rs +++ b/spectrum-network/src/protocol_handler.rs @@ -114,15 +114,15 @@ pub enum ProtocolHandlerError { MalformedMessage(RawMessage), } -pub trait ProtocolSpec<'de> { - type THandshake: serde::Serialize + serde::Deserialize<'de> + Versioned + Send; - type TMessage: serde::Serialize + serde::Deserialize<'de> + Versioned + Debug + Send + Clone; +pub trait ProtocolSpec { + type THandshake: serde::Serialize + for<'de> serde::Deserialize<'de> + Versioned + Send; + type TMessage: serde::Serialize + for<'de> serde::Deserialize<'de> + Versioned + Debug + Send + Clone; } -impl<'de, L, R> ProtocolSpec<'de> for Either +impl ProtocolSpec for Either where - L: ProtocolSpec<'de>, - R: ProtocolSpec<'de>, + L: ProtocolSpec, + R: ProtocolSpec, { type THandshake = Either; type TMessage = Either; @@ -157,21 +157,21 @@ pub trait TemporalProtocolStage { ) -> Poll, TOut>>; } -pub trait ProtocolBehaviour<'de> { +pub trait ProtocolBehaviour { /// Protocol specification. - type TProto: ProtocolSpec<'de>; + type TProto: ProtocolSpec; /// Inject an event that we have established a conn with a peer. fn inject_peer_connected(&mut self, peer_id: PeerId) {} /// Inject a new message coming from a peer. - fn inject_message(&mut self, peer_id: PeerId, content: >::TMessage) {} + fn inject_message(&mut self, peer_id: PeerId, content: ::TMessage) {} /// Inject protocol request coming from a peer. fn inject_protocol_requested( &mut self, peer_id: PeerId, - handshake: Option<>::THandshake>, + handshake: Option<::THandshake>, ) { } @@ -182,7 +182,7 @@ pub trait ProtocolBehaviour<'de> { fn inject_protocol_enabled( &mut self, peer_id: PeerId, - handshake: Option<>::THandshake>, + handshake: Option<::THandshake>, ) { } @@ -196,35 +196,35 @@ pub trait ProtocolBehaviour<'de> { ) -> Poll< Option< ProtocolBehaviourOut< - >::THandshake, - >::TMessage, + ::THandshake, + ::TMessage, >, >, >; } -pub struct BehaviourStream<'de, T, P>(T, PhantomData<&'de P>); +pub struct BehaviourStream(T, PhantomData

); -impl<'de, T, P> BehaviourStream<'de, T, P> { +impl BehaviourStream { pub fn new(behaviour: T) -> Self { Self(behaviour, PhantomData) } } -impl<'de, T, P> Unpin for BehaviourStream<'de, T, P> +impl Unpin for BehaviourStream where - P: ProtocolSpec<'de>, - T: ProtocolBehaviour<'de, TProto = P>, + P: ProtocolSpec, + T: ProtocolBehaviour, { } -impl<'de, T, P> Stream for BehaviourStream<'de, T, P> +impl Stream for BehaviourStream where - P: ProtocolSpec<'de>, - T: ProtocolBehaviour<'de, TProto = P>, + P: ProtocolSpec, + T: ProtocolBehaviour, { type Item = - ProtocolBehaviourOut<

>::THandshake,

>::TMessage>; + ProtocolBehaviourOut<

::THandshake,

::TMessage>; fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { let this = &mut *self; @@ -232,10 +232,10 @@ where } } -impl<'de, T, P> FusedStream for BehaviourStream<'de, T, P> +impl FusedStream for BehaviourStream where - P: ProtocolSpec<'de>, - T: ProtocolBehaviour<'de, TProto = P>, + P: ProtocolSpec, + T: ProtocolBehaviour, { fn is_terminated(&self) -> bool { false @@ -243,16 +243,15 @@ where } /// A layer that facilitate massage transmission from protocol handlers to peers. -pub struct ProtocolHandler<'de, TBehaviour, TNetwork> { +pub struct ProtocolHandler { peers: HashMap, inbox: Receiver, pub protocol: ProtocolId, behaviour: TBehaviour, network: TNetwork, - pd: PhantomData<&'de Void>, } -impl<'de, TBehaviour, TNetwork> ProtocolHandler<'de, TBehaviour, TNetwork> { +impl ProtocolHandler { pub fn new( behaviour: TBehaviour, network: TNetwork, @@ -267,19 +266,18 @@ impl<'de, TBehaviour, TNetwork> ProtocolHandler<'de, TBehaviour, TNetwork> { protocol, behaviour, network, - pd: PhantomData::default(), }; (prot_handler, prot_mailbox) } } -impl<'de, TBehaviour, TNetwork> Stream for ProtocolHandler<'de, TBehaviour, TNetwork> +impl Stream for ProtocolHandler where - TBehaviour: ProtocolBehaviour<'de> + Unpin, + TBehaviour: ProtocolBehaviour + Unpin, TNetwork: NetworkAPI + Unpin, { #[cfg(feature = "integration_tests")] - type Item = >::TMessage; + type Item = ::TMessage; #[cfg(not(feature = "integration_tests"))] type Item = (); @@ -430,7 +428,7 @@ where } /// The stream of protocol events never terminates, so we can implement fused for it. -impl<'de, TBehaviour, TNetwork> FusedStream for ProtocolHandler<'de, TBehaviour, TNetwork> +impl FusedStream for ProtocolHandler where Self: Stream, { diff --git a/spectrum-network/src/protocol_handler/codec.rs b/spectrum-network/src/protocol_handler/codec.rs index 1f85e419..2fba82e1 100644 --- a/spectrum-network/src/protocol_handler/codec.rs +++ b/spectrum-network/src/protocol_handler/codec.rs @@ -9,7 +9,7 @@ pub fn encode(obj: T) -> RawMessage { RawMessage::from(encoded) } -pub fn decode<'de, T: Deserialize<'de>>(msg: RawMessage) -> Result> { +pub fn decode Deserialize<'de>>(msg: RawMessage) -> Result> { let bf: Vec = msg.into(); ciborium::de::from_reader(&bf[..]) } diff --git a/spectrum-network/src/protocol_handler/diffusion.rs b/spectrum-network/src/protocol_handler/diffusion.rs index cc24acf8..f5a8e02e 100644 --- a/spectrum-network/src/protocol_handler/diffusion.rs +++ b/spectrum-network/src/protocol_handler/diffusion.rs @@ -314,7 +314,7 @@ fn decode_modifier( res.map_err(|_| ()) } -impl<'a, 'de, THistory, TLedgerView> ProtocolBehaviour<'de> for DiffusionBehaviour<'a, THistory, TLedgerView> +impl<'a, THistory, TLedgerView> ProtocolBehaviour for DiffusionBehaviour<'a, THistory, TLedgerView> where THistory: HistoryReadAsync + 'a, TLedgerView: LedgerViewWriteAsync + 'a, diff --git a/spectrum-network/src/protocol_handler/diffusion/message.rs b/spectrum-network/src/protocol_handler/diffusion/message.rs index 4f788ed0..7c625f1a 100644 --- a/spectrum-network/src/protocol_handler/diffusion/message.rs +++ b/spectrum-network/src/protocol_handler/diffusion/message.rs @@ -88,7 +88,7 @@ impl DiffusionSpec { } } -impl<'de> ProtocolSpec<'de> for DiffusionSpec { +impl ProtocolSpec for DiffusionSpec { type THandshake = DiffusionHandshake; type TMessage = DiffusionMessage; } diff --git a/spectrum-network/src/protocol_handler/discovery.rs b/spectrum-network/src/protocol_handler/discovery.rs index 59f716bc..a17b280d 100644 --- a/spectrum-network/src/protocol_handler/discovery.rs +++ b/spectrum-network/src/protocol_handler/discovery.rs @@ -101,7 +101,7 @@ where } } -impl<'de, TPeers> ProtocolBehaviour<'de> for DiscoveryBehaviour +impl ProtocolBehaviour for DiscoveryBehaviour where TPeers: Peers, { diff --git a/spectrum-network/src/protocol_handler/discovery/message.rs b/spectrum-network/src/protocol_handler/discovery/message.rs index 128bda15..58c2e963 100644 --- a/spectrum-network/src/protocol_handler/discovery/message.rs +++ b/spectrum-network/src/protocol_handler/discovery/message.rs @@ -52,7 +52,7 @@ impl DiscoverySpec { } } -impl<'de> ProtocolSpec<'de> for DiscoverySpec { +impl ProtocolSpec for DiscoverySpec { type THandshake = DiscoveryHandshake; type TMessage = DiscoveryMessage; } diff --git a/spectrum-network/src/protocol_handler/sigma_aggregation.rs b/spectrum-network/src/protocol_handler/sigma_aggregation.rs index e4a80b48..4c4dec61 100644 --- a/spectrum-network/src/protocol_handler/sigma_aggregation.rs +++ b/spectrum-network/src/protocol_handler/sigma_aggregation.rs @@ -324,7 +324,7 @@ where } } -impl<'a, 'de, H, MPP> ProtocolBehaviour<'de> for SigmaAggregation<'a, H, MPP> +impl<'a, H, MPP> ProtocolBehaviour for SigmaAggregation<'a, H, MPP> where H: Debug, MPP: MakePeerPartitions + Clone + Send, diff --git a/spectrum-network/src/protocol_handler/sigma_aggregation/message.rs b/spectrum-network/src/protocol_handler/sigma_aggregation/message.rs index 4f63c982..0addc4cd 100644 --- a/spectrum-network/src/protocol_handler/sigma_aggregation/message.rs +++ b/spectrum-network/src/protocol_handler/sigma_aggregation/message.rs @@ -29,7 +29,7 @@ impl Versioned for SigmaAggrMessage { pub struct SigmaAggrSpec; -impl<'de> ProtocolSpec<'de> for SigmaAggrSpec { +impl ProtocolSpec for SigmaAggrSpec { type THandshake = VoidMessage; type TMessage = SigmaAggrMessage; } From 06164a03d053b8cc317d04ce78b882970da9097a Mon Sep 17 00:00:00 2001 From: Tim Ling <791016+kettlebell@users.noreply.github.com> Date: Mon, 19 Jun 2023 14:22:03 +1000 Subject: [PATCH 11/11] Fix lifetime errors --- .../tests/integration_tests/aggregation.rs | 1 - .../tests/integration_tests/fake_sync_behaviour.rs | 8 +++----- spectrum-network/tests/integration_tests/mod.rs | 12 ++++-------- spectrum-network/tests/tests.rs | 11 ++++++----- 4 files changed, 13 insertions(+), 19 deletions(-) diff --git a/spectrum-network/tests/integration_tests/aggregation.rs b/spectrum-network/tests/integration_tests/aggregation.rs index 66712d96..a2a279d5 100644 --- a/spectrum-network/tests/integration_tests/aggregation.rs +++ b/spectrum-network/tests/integration_tests/aggregation.rs @@ -158,7 +158,6 @@ pub struct Peer<'de> { pub peer_sk: SecretKey, pub aggr_handler_mailbox: Sender>, pub aggr_handler: ProtocolHandler< - 'de, SigmaAggregation<'de, Blake2b, MakeBinomialPeerPartitions>, NetworkMailbox, >, diff --git a/spectrum-network/tests/integration_tests/fake_sync_behaviour.rs b/spectrum-network/tests/integration_tests/fake_sync_behaviour.rs index 9fe4dc69..8204ebb5 100644 --- a/spectrum-network/tests/integration_tests/fake_sync_behaviour.rs +++ b/spectrum-network/tests/integration_tests/fake_sync_behaviour.rs @@ -52,7 +52,7 @@ impl FakeSyncSpec { } } -impl<'de> spectrum_network::protocol_handler::ProtocolSpec<'de> for FakeSyncSpec { +impl spectrum_network::protocol_handler::ProtocolSpec for FakeSyncSpec { type THandshake = DiscoveryHandshake; type TMessage = FakeSyncMessage; } @@ -128,7 +128,7 @@ where } } -impl<'de, TPeers> ProtocolBehaviour<'de> for FakeSyncBehaviour +impl ProtocolBehaviour for FakeSyncBehaviour where TPeers: Peers, { @@ -176,9 +176,7 @@ where fn inject_protocol_enabled( &mut self, peer_id: PeerId, - _handshake: Option< - >::THandshake, - >, + _handshake: Option<::THandshake>, ) { self.send_fake_msg(peer_id); } diff --git a/spectrum-network/tests/integration_tests/mod.rs b/spectrum-network/tests/integration_tests/mod.rs index 16007495..a6c01547 100644 --- a/spectrum-network/tests/integration_tests/mod.rs +++ b/spectrum-network/tests/integration_tests/mod.rs @@ -940,14 +940,10 @@ async fn create_swarm

( peer: Peer, mut tx: mpsc::Sender<( Peer, - Msg< - <

>::TProto as spectrum_network::protocol_handler::ProtocolSpec< - 'static, - >>::TMessage, - >, + Msg<<

::TProto as spectrum_network::protocol_handler::ProtocolSpec>::TMessage>, )>, ) where - P: ProtocolBehaviour<'static> + Unpin + Send + 'static, + P: ProtocolBehaviour + Unpin + Send + 'static, { let transport = tcp::async_io::Transport::default() .upgrade(Version::V1Lazy) @@ -1091,11 +1087,11 @@ fn make_swarm_components( gen_protocol_behaviour: F, msg_buffer_size: usize, ) -> ( - ProtocolHandler<'static, P, NetworkMailbox>, + ProtocolHandler, NetworkController, ProtocolMailbox>, ) where - P: ProtocolBehaviour<'static> + Unpin + Send + 'static, + P: ProtocolBehaviour + Unpin + Send + 'static, F: FnOnce(PeersMailbox) -> P, { let peer_conn_handler_conf = PeerConnHandlerConf { diff --git a/spectrum-network/tests/tests.rs b/spectrum-network/tests/tests.rs index 93101c11..97be2799 100644 --- a/spectrum-network/tests/tests.rs +++ b/spectrum-network/tests/tests.rs @@ -32,7 +32,7 @@ use spectrum_network::protocol::{ }; use spectrum_network::protocol_api::ProtocolMailbox; use spectrum_network::protocol_handler::discovery::message::DiscoverySpec; -use spectrum_network::protocol_handler::discovery::{NodeStatus, DiscoveryBehaviour}; +use spectrum_network::protocol_handler::discovery::{DiscoveryBehaviour, NodeStatus}; use spectrum_network::protocol_handler::ProtocolHandler; use spectrum_network::types::Reputation; @@ -116,7 +116,7 @@ pub fn build_node<'de>( local_status: NodeStatus, ) -> ( Swarm, - ProtocolHandler<'de, DiscoveryBehaviour, NetworkMailbox>, + ProtocolHandler, NetworkMailbox>, ) { let noise_keys = noise::Keypair::::new() .into_authentic(&keypair) @@ -171,7 +171,8 @@ pub fn build_node<'de>( let network_api = NetworkMailbox { mailbox_snd: requests_snd, }; - let (sync_handler, sync_mailbox) = ProtocolHandler::new(sync_behaviour, network_api, DISCOVERY_PROTOCOL_ID, 10); + let (sync_handler, sync_mailbox) = + ProtocolHandler::new(sync_behaviour, network_api, DISCOVERY_PROTOCOL_ID, 10); let nc = NetworkController::new( peer_conn_handler_conf, HashMap::from([( @@ -197,11 +198,11 @@ pub fn build_node<'de>( /// Builds two nodes that have each other as bootstrap nodes. /// This is to be used only for testing, and a panic will happen if something goes wrong. #[allow(clippy::type_complexity)] -pub fn build_nodes<'de>( +pub fn build_nodes( n: usize, ) -> Vec<( Swarm, - ProtocolHandler<'de, DiscoveryBehaviour, NetworkMailbox>, + ProtocolHandler, NetworkMailbox>, )> { let mut out = Vec::with_capacity(n);