Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Original file line number Diff line number Diff line change
Expand Up @@ -6,14 +6,13 @@ use commonware_cryptography::{
bls12381::primitives::variant::MinPk, certificate::mocks::Fixture,
ed25519::PublicKey as Ed25519PublicKey,
};
use commonware_parallel::Sequential;
use commonware_runtime::deterministic;
use libfuzzer_sys::fuzz_target;

struct SimplexBls12381MinPk;

impl Simplex for SimplexBls12381MinPk {
type Scheme = bls12381_threshold::Scheme<Ed25519PublicKey, MinPk, Sequential>;
type Scheme = bls12381_threshold::Scheme<Ed25519PublicKey, MinPk>;
type Elector = Random;

fn fixture(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,14 +6,13 @@ use commonware_cryptography::{
bls12381::primitives::variant::MinSig, certificate::mocks::Fixture,
ed25519::PublicKey as Ed25519PublicKey,
};
use commonware_parallel::Sequential;
use commonware_runtime::deterministic;
use libfuzzer_sys::fuzz_target;

struct SimplexBls12381MinSig;

impl Simplex for SimplexBls12381MinSig {
type Scheme = bls12381_threshold::Scheme<Ed25519PublicKey, MinSig, Sequential>;
type Scheme = bls12381_threshold::Scheme<Ed25519PublicKey, MinSig>;
type Elector = Random;

fn fixture(
Expand Down
13 changes: 2 additions & 11 deletions consensus/fuzz/fuzz_targets/simplex_elector.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@ use commonware_cryptography::{
Sha256, Signer,
};
use commonware_math::algebra::Random as _;
use commonware_parallel::Sequential;
use commonware_utils::{ordered::Set, TryCollect};
use libfuzzer_sys::fuzz_target;
use rand::{rngs::StdRng, SeedableRng};
Expand Down Expand Up @@ -77,18 +76,10 @@ fuzz_target!(|input: FuzzInput| {
fuzz::<ed25519::Scheme, _>(&input, RoundRobin::<Sha256>::shuffled(seed), None);
}
FuzzElector::RandomMinPk(certificate) => {
fuzz::<bls12381_threshold::Scheme<_, MinPk, Sequential>, _>(
&input,
Random,
Some(certificate),
);
fuzz::<bls12381_threshold::Scheme<_, MinPk>, _>(&input, Random, Some(certificate));
}
FuzzElector::RandomMinSig(certificate) => {
fuzz::<bls12381_threshold::Scheme<_, MinSig, Sequential>, _>(
&input,
Random,
Some(certificate),
);
fuzz::<bls12381_threshold::Scheme<_, MinSig>, _>(&input, Random, Some(certificate));
}
}
});
5 changes: 2 additions & 3 deletions consensus/fuzz/fuzz_targets/simplex_messages.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,14 +15,13 @@ use commonware_cryptography::{
ed25519::PublicKey,
sha256,
};
use commonware_parallel::Sequential;
use libfuzzer_sys::fuzz_target;

type Ed25519Scheme = ed25519::Scheme;
type Bls12381MultisigMinPk = bls12381_multisig::Scheme<PublicKey, MinPk>;
type Bls12381MultisigMinSig = bls12381_multisig::Scheme<PublicKey, MinSig>;
type ThresholdSchemeMinPk = bls12381_threshold::Scheme<PublicKey, MinPk, Sequential>;
type ThresholdSchemeMinSig = bls12381_threshold::Scheme<PublicKey, MinSig, Sequential>;
type ThresholdSchemeMinPk = bls12381_threshold::Scheme<PublicKey, MinPk>;
type ThresholdSchemeMinSig = bls12381_threshold::Scheme<PublicKey, MinSig>;

#[derive(Arbitrary, Debug)]
enum FuzzInput {
Expand Down
2 changes: 2 additions & 0 deletions consensus/fuzz/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ use commonware_cryptography::{
Sha256,
};
use commonware_p2p::simulated::{Config as NetworkConfig, Link, Network};
use commonware_parallel::Sequential;
use commonware_runtime::{buffer::PoolRef, deterministic, Clock, Metrics, Runner, Spawner};
use commonware_utils::{max_faults, NZUsize, NZU16};
use futures::{channel::mpsc::Receiver, future::join_all, StreamExt};
Expand Down Expand Up @@ -262,6 +263,7 @@ fn run<P: Simplex>(input: FuzzInput) {
replay_buffer: NZUsize!(1024 * 1024),
write_buffer: NZUsize!(1024 * 1024),
buffer_pool: PoolRef::new(PAGE_SIZE, PAGE_CACHE_SIZE),
strategy: Sequential,
};
let engine = Engine::new(context.with_label("engine"), engine_cfg);
engine.start(pending, recovered, resolver);
Expand Down
5 changes: 5 additions & 0 deletions consensus/src/aggregation/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ use commonware_cryptography::{
Digest,
};
use commonware_p2p::Blocker;
use commonware_parallel::Strategy;
use commonware_runtime::buffer::PoolRef;
use commonware_utils::NonZeroDuration;
use std::num::{NonZeroU64, NonZeroUsize};
Expand All @@ -20,6 +21,7 @@ pub struct Config<
Z: Reporter<Activity = Activity<P::Scheme, D>>,
M: Monitor<Index = Epoch>,
B: Blocker<PublicKey = <P::Scheme as Scheme>::PublicKey>,
T: Strategy,
> {
/// Tracks the current state of consensus (to determine which participants should
/// be involved in the current broadcast attempt).
Expand Down Expand Up @@ -77,4 +79,7 @@ pub struct Config<

/// Buffer pool for the journal.
pub journal_buffer_pool: PoolRef,

/// Strategy for parallel operations.
pub strategy: T,
}
13 changes: 9 additions & 4 deletions consensus/src/aggregation/engine.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ use commonware_p2p::{
utils::codec::{wrap, WrappedSender},
Blocker, Receiver, Recipients, Sender,
};
use commonware_parallel::Strategy;
use commonware_runtime::{
buffer::PoolRef,
spawn_cell,
Expand Down Expand Up @@ -77,6 +78,7 @@ pub struct Engine<
Z: Reporter<Activity = Activity<P::Scheme, D>>,
M: Monitor<Index = Epoch>,
B: Blocker<PublicKey = <P::Scheme as Scheme>::PublicKey>,
T: Strategy,
> {
// ---------- Interfaces ----------
context: ContextCell<E>,
Expand All @@ -85,6 +87,7 @@ pub struct Engine<
provider: P,
reporter: Z,
blocker: B,
strategy: T,

// Pruning
/// A tuple representing the epochs to keep in memory.
Expand Down Expand Up @@ -158,10 +161,11 @@ impl<
Z: Reporter<Activity = Activity<P::Scheme, D>>,
M: Monitor<Index = Epoch>,
B: Blocker<PublicKey = <P::Scheme as Scheme>::PublicKey>,
> Engine<E, P, D, A, Z, M, B>
T: Strategy,
> Engine<E, P, D, A, Z, M, B, T>
{
/// Creates a new engine with the given context and configuration.
pub fn new(context: E, cfg: Config<P, D, A, Z, M, B>) -> Self {
pub fn new(context: E, cfg: Config<P, D, A, Z, M, B, T>) -> Self {
// TODO(#1833): Metrics should use the post-start context
let metrics = metrics::Metrics::init(context.clone());

Expand All @@ -172,6 +176,7 @@ impl<
monitor: cfg.monitor,
provider: cfg.provider,
blocker: cfg.blocker,
strategy: cfg.strategy,
epoch_bounds: cfg.epoch_bounds,
window: HeightDelta::new(cfg.window.into()),
activity_timeout: cfg.activity_timeout,
Expand Down Expand Up @@ -519,7 +524,7 @@ impl<
.filter(|a| a.item.digest == ack.item.digest)
.collect::<Vec<_>>();
if filtered.len() >= quorum as usize {
if let Some(certificate) = Certificate::from_acks(&*scheme, filtered) {
if let Some(certificate) = Certificate::from_acks(&*scheme, filtered, &self.strategy) {
self.metrics.certificates.inc();
self.handle_certificate(certificate).await;
}
Expand Down Expand Up @@ -666,7 +671,7 @@ impl<
}

// Validate signature
if !ack.verify(&mut self.context, &*scheme) {
if !ack.verify(&mut self.context, &*scheme, &self.strategy) {
return Err(Error::InvalidAckSignature);
}

Expand Down
5 changes: 3 additions & 2 deletions consensus/src/aggregation/mocks/reporter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ use crate::{
};
use commonware_codec::{Decode, DecodeExt, Encode};
use commonware_cryptography::{certificate::Scheme, Digest};
use commonware_parallel::Sequential;
use futures::{
channel::{mpsc, oneshot},
SinkExt, StreamExt,
Expand Down Expand Up @@ -77,7 +78,7 @@ where
match msg {
Message::Ack(ack) => {
// Verify properly constructed (not needed in production)
assert!(ack.verify(&mut self.rng, &self.scheme));
assert!(ack.verify(&mut self.rng, &self.scheme, &Sequential));

// Test encoding/decoding
let encoded = ack.encode();
Expand All @@ -91,7 +92,7 @@ where
}
Message::Certified(certificate) => {
// Verify certificate
assert!(certificate.verify(&mut self.rng, &self.scheme));
assert!(certificate.verify(&mut self.rng, &self.scheme, &Sequential));

// Test encoding/decoding
let encoded = certificate.encode();
Expand Down
6 changes: 6 additions & 0 deletions consensus/src/aggregation/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,7 @@ mod tests {
};
use commonware_macros::{select, test_group, test_traced};
use commonware_p2p::simulated::{Link, Network, Oracle, Receiver, Sender};
use commonware_parallel::Sequential;
use commonware_runtime::{
buffer::PoolRef,
deterministic::{self, Context},
Expand Down Expand Up @@ -249,6 +250,7 @@ mod tests {
journal_heights_per_section: std::num::NonZeroU64::new(6).unwrap(),
journal_compression: Some(3),
journal_buffer_pool: PoolRef::new(PAGE_SIZE, PAGE_CACHE_SIZE),
strategy: Sequential,
},
);

Expand Down Expand Up @@ -492,6 +494,7 @@ mod tests {
journal_heights_per_section: std::num::NonZeroU64::new(6).unwrap(),
journal_compression: Some(3),
journal_buffer_pool: PoolRef::new(PAGE_SIZE, PAGE_CACHE_SIZE),
strategy: Sequential,
},
);

Expand Down Expand Up @@ -640,6 +643,7 @@ mod tests {
journal_heights_per_section: std::num::NonZeroU64::new(6).unwrap(),
journal_compression: Some(3),
journal_buffer_pool: PoolRef::new(PAGE_SIZE, PAGE_CACHE_SIZE),
strategy: Sequential,
},
);

Expand Down Expand Up @@ -722,6 +726,7 @@ mod tests {
journal_heights_per_section: std::num::NonZeroU64::new(6).unwrap(),
journal_compression: Some(3),
journal_buffer_pool: PoolRef::new(PAGE_SIZE, PAGE_CACHE_SIZE),
strategy: Sequential,
},
);

Expand Down Expand Up @@ -1062,6 +1067,7 @@ mod tests {
journal_heights_per_section: std::num::NonZeroU64::new(6).unwrap(),
journal_compression: Some(3),
journal_buffer_pool: PoolRef::new(PAGE_SIZE, PAGE_CACHE_SIZE),
strategy: Sequential,
},
);

Expand Down
26 changes: 16 additions & 10 deletions consensus/src/aggregation/types.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ use commonware_cryptography::{
certificate::{self, Attestation, Scheme, Subject},
Digest,
};
use commonware_parallel::Strategy;
use commonware_utils::union;
use futures::channel::oneshot;
use rand_core::CryptoRngCore;
Expand Down Expand Up @@ -181,12 +182,12 @@ impl<S: Scheme, D: Digest> Ack<S, D> {
///
/// Returns `true` if the attestation is valid for the given namespace and public key.
/// Domain separation is automatically applied to prevent signature reuse.
pub fn verify<R>(&self, rng: &mut R, scheme: &S) -> bool
pub fn verify<R>(&self, rng: &mut R, scheme: &S, strategy: &impl Strategy) -> bool
where
R: CryptoRngCore,
S: scheme::Scheme<D>,
{
scheme.verify_attestation::<_, D>(rng, &self.item, &self.attestation)
scheme.verify_attestation::<_, D>(rng, &self.item, &self.attestation, strategy)
}

/// Creates a new acknowledgment by signing an item with a validator's key.
Expand Down Expand Up @@ -313,7 +314,11 @@ pub struct Certificate<S: Scheme, D: Digest> {
}

impl<S: Scheme, D: Digest> Certificate<S, D> {
pub fn from_acks<'a>(scheme: &S, acks: impl IntoIterator<Item = &'a Ack<S, D>>) -> Option<Self>
pub fn from_acks<'a>(
scheme: &S,
acks: impl IntoIterator<Item = &'a Ack<S, D>>,
strategy: &impl Strategy,
) -> Option<Self>
where
S: scheme::Scheme<D>,
{
Expand All @@ -322,18 +327,18 @@ impl<S: Scheme, D: Digest> Certificate<S, D> {
let attestations = iter
.filter(|ack| ack.item == item)
.map(|ack| ack.attestation.clone());
let certificate = scheme.assemble(attestations)?;
let certificate = scheme.assemble(attestations, strategy)?;

Some(Self { item, certificate })
}

/// Verifies the recovered certificate for the item.
pub fn verify<R>(&self, rng: &mut R, scheme: &S) -> bool
pub fn verify<R>(&self, rng: &mut R, scheme: &S, strategy: &impl Strategy) -> bool
where
R: CryptoRngCore,
S: scheme::Scheme<D>,
{
scheme.verify_certificate::<_, D>(rng, &self.item, &self.certificate)
scheme.verify_certificate::<_, D>(rng, &self.item, &self.certificate, strategy)
}
}

Expand Down Expand Up @@ -464,6 +469,7 @@ mod tests {
certificate::mocks::Fixture,
Hasher, Sha256,
};
use commonware_parallel::Sequential;
use commonware_utils::{ordered::Quorum, test_rng};
use rand::rngs::StdRng;

Expand Down Expand Up @@ -504,7 +510,7 @@ mod tests {
// Verify the restored ack
assert_eq!(restored_ack.item, item);
assert_eq!(restored_ack.epoch, Epoch::new(1));
assert!(restored_ack.verify(&mut rng, &schemes[0]));
assert!(restored_ack.verify(&mut rng, &schemes[0], &Sequential));

// Test TipAck codec
let tip_ack = TipAck {
Expand Down Expand Up @@ -537,16 +543,16 @@ mod tests {
.filter_map(|scheme| Ack::sign(scheme, Epoch::new(1), item.clone()))
.collect();

let certificate = Certificate::from_acks(&schemes[0], &acks).unwrap();
assert!(certificate.verify(&mut rng, &schemes[0]));
let certificate = Certificate::from_acks(&schemes[0], &acks, &Sequential).unwrap();
assert!(certificate.verify(&mut rng, &schemes[0], &Sequential));

let activity_certified = Activity::Certified(certificate.clone());
let encoded_certified = activity_certified.encode();
let restored_activity_certified: Activity<S, Sha256Digest> =
Activity::decode_cfg(encoded_certified, &cfg).unwrap();
if let Activity::Certified(restored) = restored_activity_certified {
assert_eq!(restored.item, item);
assert!(restored.verify(&mut rng, &schemes[0]));
assert!(restored.verify(&mut rng, &schemes[0], &Sequential));
} else {
panic!("Expected Activity::Certified");
}
Expand Down
Loading
Loading