Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

6 changes: 3 additions & 3 deletions coding/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -132,13 +132,13 @@ pub trait Scheme: Debug + Clone + Send + Sync + 'static {
/// the data.
type ReShard: Clone + Eq + Codec<Cfg = CodecConfig> + Send + Sync + 'static;
/// Data which can assist in checking shards.
type CheckingData: Clone + Send;
type CheckingData: Clone + Send + Sync;
/// A shard that has been checked for inclusion in the commitment.
///
/// This allows excluding [Scheme::ReShard]s which are invalid, and shouldn't
/// be considered as progress towards meeting the minimum number of shards.
type CheckedShard;
type Error: std::fmt::Debug;
type CheckedShard: Clone + Send;
type Error: std::fmt::Debug + Send;

/// Encode a piece of data, returning a commitment, along with shards, and proofs.
///
Expand Down
1 change: 1 addition & 0 deletions coding/src/zoda.rs
Original file line number Diff line number Diff line change
Expand Up @@ -459,6 +459,7 @@ where
}

/// A ZODA shard that has been checked for integrity already.
#[derive(Clone)]
pub struct CheckedShard {
index: usize,
shard: Matrix,
Expand Down
8 changes: 5 additions & 3 deletions consensus/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,9 @@ bytes.workspace = true
cfg-if.workspace = true
commonware-broadcast.workspace = true
commonware-codec.workspace = true
commonware-coding.workspace = true
commonware-cryptography.workspace = true
commonware-math = { workspace = true, optional = true }
commonware-math.workspace = true
commonware-resolver.workspace = true
commonware-utils.workspace = true
futures.workspace = true
Expand All @@ -36,13 +37,14 @@ commonware-storage = { workspace = true, features = ["std"] }
pin-project.workspace = true
prometheus-client.workspace = true
rand_distr.workspace = true
rayon.workspace = true
tracing.workspace = true

[dev-dependencies]
commonware-conformance.workspace = true
commonware-consensus = { path = ".", features = ["mocks"] }
commonware-math.workspace = true
commonware-resolver = { workspace = true, features = ["mocks"] }
commonware-runtime = { workspace = true, features = ["test-utils"] }
rstest.workspace = true
tracing-subscriber.workspace = true

Expand All @@ -55,9 +57,9 @@ crate-type = ["rlib", "cdylib"]

[features]
mocks = [ "commonware-cryptography/mocks" ]
fuzz = [ "dep:commonware-math", "mocks" ]
arbitrary = [
"commonware-codec/arbitrary",
"commonware-coding/arbitrary",
"commonware-cryptography/arbitrary",
"commonware-math/arbitrary",
"commonware-p2p/arbitrary",
Expand Down
14 changes: 13 additions & 1 deletion consensus/conformance.toml
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,15 @@ hash = "69536ce6262449bc22fd256943545a3c9d8c5290b67a9a6e8396a5e5cf761961"
n_cases = 65536
hash = "d22d425105d9dda042400a4040dfd06ddcc61957cffebad4b2d5c527979b07f3"

["commonware_consensus::marshal::ingress::handler::tests::conformance::CodecConformance<Request<B>>"]
["commonware_consensus::marshal::coding::types::test::conformance::CodecConformance<DistributionShard<ReedSolomon<Sha256>>>"]
n_cases = 65536
hash = "dbada4322b76e25ff6e66144c8231864593fdec4a5b2ab6ba579108f0066ce71"

["commonware_consensus::marshal::coding::types::test::conformance::CodecConformance<Shard<ReedSolomon<Sha256>,Sha256>>"]
n_cases = 65536
hash = "47738dfddcfeba090b46eb9f4af2cc31544f9b9433d49c19fcd861b16af4b861"

["commonware_consensus::marshal::resolver::handler::tests::conformance::CodecConformance<Request<B>>"]
n_cases = 65536
hash = "85cd899b2ae04636d34a4559d6ffdde0e186f3bd8045282a832868491c608e6f"

Expand Down Expand Up @@ -126,6 +134,10 @@ hash = "37c0c4f2dd328ed18a8f29fb73ca7c82c4008e49ea00243ffb3e024873a5b565"
n_cases = 65536
hash = "dec7851b91f56833b8f917c7e3ba16a28cd2683c68dda6c021c5d3c5c2400a37"

["commonware_consensus::types::tests::conformance::CodecConformance<CodingCommitment>"]
n_cases = 65536
hash = "92466d1b53e3936ea90efb7e536c4c030f4d14e87d4a3cd8a507a7906adb3fbf"

["commonware_consensus::types::tests::conformance::CodecConformance<Epoch>"]
n_cases = 65536
hash = "56ab85978136cb50f12ea89c8d25ce24451788172fe9d3d3c063f7ae6342d279"
Expand Down
2 changes: 1 addition & 1 deletion consensus/fuzz/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ cargo-fuzz = true
arbitrary = { workspace = true, features = ["derive"] }
bytes.workspace = true
commonware-codec.workspace = true
commonware-consensus = { workspace = true, features = ["arbitrary", "fuzz"] }
commonware-consensus = { workspace = true, features = ["mocks", "arbitrary"] }
commonware-cryptography.workspace = true
commonware-macros.workspace = true
commonware-math.workspace = true
Expand Down
7 changes: 0 additions & 7 deletions consensus/src/application/mod.rs

This file was deleted.

20 changes: 9 additions & 11 deletions consensus/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@
)]

use commonware_codec::Codec;
use commonware_cryptography::{Committable, Digestible};
use commonware_cryptography::Digestible;

pub mod aggregation;
pub mod ordered_broadcast;
Expand All @@ -37,25 +37,23 @@ pub trait Viewable {
/// Block is the interface for a block in the blockchain.
///
/// Blocks are used to track the progress of the consensus engine.
pub trait Block: Codec + Digestible + Committable + Send + Sync + 'static {
pub trait Block: Codec + Digestible + Send + Sync + 'static {
/// Get the height of the block.
fn height(&self) -> u64;

/// Get the parent block's digest.
fn parent(&self) -> Self::Commitment;
fn parent(&self) -> Self::Digest;
}

cfg_if::cfg_if! {
if #[cfg(not(target_arch = "wasm32"))] {
use commonware_cryptography::Digest;
use commonware_cryptography::{Digest, certificate::Scheme};
use futures::channel::{oneshot, mpsc};
use std::future::Future;
use commonware_runtime::{Spawner, Metrics, Clock};
use rand::Rng;
use crate::marshal::ingress::mailbox::AncestorStream;
use commonware_cryptography::certificate::Scheme;
use crate::marshal::ancestry::{AncestorStream, AncestryProvider};

pub mod application;
pub mod marshal;
mod reporter;
pub use reporter::*;
Expand Down Expand Up @@ -150,10 +148,10 @@ cfg_if::cfg_if! {

/// Build a new block on top of the provided parent ancestry. If the build job fails,
/// the implementor should return [None].
fn propose(
fn propose<A: AncestryProvider<Block = Self::Block>>(
&mut self,
context: (E, Self::Context),
ancestry: AncestorStream<Self::SigningScheme, Self::Block>,
ancestry: AncestorStream<A, Self::Block>,
) -> impl Future<Output = Option<Self::Block>> + Send;
}

Expand All @@ -168,10 +166,10 @@ cfg_if::cfg_if! {
E: Rng + Spawner + Metrics + Clock
{
/// Verify a block produced by the application's proposer, relative to its ancestry.
fn verify(
fn verify<A: AncestryProvider<Block = Self::Block>>(
&mut self,
context: (E, Self::Context),
ancestry: AncestorStream<Self::SigningScheme, Self::Block>,
ancestry: AncestorStream<A, Self::Block>,
) -> impl Future<Output = bool> + Send;
}

Expand Down
202 changes: 202 additions & 0 deletions consensus/src/marshal/ancestry.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,202 @@
//! A stream that yields the ancestors of a block while prefetching parents.

use crate::Block;
use commonware_cryptography::Digestible;
use futures::{
future::{BoxFuture, OptionFuture},
FutureExt, Stream,
};
use pin_project::pin_project;
use std::{
future::Future,
pin::Pin,
task::{Context, Poll},
};

/// An interface for providing ancestors.
pub trait AncestryProvider: Clone + Send + 'static {
/// The block type the provider fetches.
type Block: Block;

/// A request to retrieve a block by its digest.
///
/// If the block is found available locally, the block will be returned immediately.
///
/// If the block is not available locally, the request will be registered and the caller will
/// be notified when the block is available. If the block is not finalized, it's possible that
/// it may never become available.
fn fetch_block(
self,
digest: <Self::Block as Digestible>::Digest,
) -> impl Future<Output = Self::Block> + Send;
}

/// Yields the ancestors of a block while prefetching parents, _not_ including the genesis block.
///
/// TODO(clabby): Once marshal can also yield the genesis block, this stream should end
/// at block height 0 rather than 1.
#[pin_project]
pub struct AncestorStream<M, B: Block> {
buffered: Vec<B>,
marshal: M,
#[pin]
pending: OptionFuture<BoxFuture<'static, B>>,
}

impl<M, B: Block> AncestorStream<M, B> {
/// Creates a new [AncestorStream] starting from the given ancestry.
///
/// # Panics
///
/// Panics if the initial blocks are not contiguous in height.
pub(crate) fn new(marshal: M, initial: impl IntoIterator<Item = B>) -> Self {
let mut buffered = initial.into_iter().collect::<Vec<B>>();
buffered.sort_by_key(Block::height);

// Check that the initial blocks are contiguous in height.
buffered.windows(2).for_each(|window| {
assert_eq!(
window[0].height() + 1,
window[1].height(),
"initial blocks must be contiguous in height"
);
});

Self {
marshal,
buffered,
pending: None.into(),
}
}
}

impl<M, B> Stream for AncestorStream<M, B>
where
M: AncestryProvider<Block = B>,
B: Block,
{
type Item = B;

fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
// Because marshal cannot currently yield the genesis block, we stop at height 1.
const END_BOUND: u64 = 1;

let mut this = self.project();

// If a result has been buffered, return it and queue the parent fetch if needed.
if let Some(block) = this.buffered.pop() {
let height = block.height();
let should_fetch_parent = height > END_BOUND;
let end_of_buffered = this.buffered.is_empty();
if should_fetch_parent && end_of_buffered {
let parent_digest = block.parent();
let future = this.marshal.clone().fetch_block(parent_digest).boxed();
*this.pending.as_mut() = Some(future).into();

// Explicitly poll the next future to kick off the fetch. If it's already ready,
// buffer it for the next poll.
if let Poll::Ready(Some(block)) = this.pending.as_mut().poll(cx) {
this.buffered.push(block);
}
} else if !should_fetch_parent {
// No more parents to fetch; Finish the stream.
*this.pending.as_mut() = None.into();
}

return Poll::Ready(Some(block));
}

match this.pending.as_mut().poll(cx) {
Poll::Pending => Poll::Pending,
Poll::Ready(None) => Poll::Ready(None),
Poll::Ready(Some(block)) => {
let height = block.height();
let should_fetch_parent = height > END_BOUND;
if should_fetch_parent {
let parent_digest = block.parent();
let future = this.marshal.clone().fetch_block(parent_digest).boxed();
*this.pending.as_mut() = Some(future).into();

// Explicitly poll the next future to kick off the fetch. If it's already ready,
// buffer it for the next poll.
if let Poll::Ready(Some(block)) = this.pending.as_mut().poll(cx) {
this.buffered.push(block);
}
} else {
// No more parents to fetch; Finish the stream.
*this.pending.as_mut() = None.into();
}

Poll::Ready(Some(block))
}
}
}
}

#[cfg(test)]
mod test {
use super::*;
use crate::marshal::mocks::block::Block;
use commonware_cryptography::{sha256::Digest as Sha256Digest, Digest, Sha256};
use commonware_macros::test_async;
use futures::StreamExt;

#[derive(Default, Clone)]
struct MockProvider(Vec<Block<Sha256Digest>>);
impl AncestryProvider for MockProvider {
type Block = Block<Sha256Digest>;

async fn fetch_block(self, digest: Sha256Digest) -> Self::Block {
self.0
.into_iter()
.find(|b| b.digest() == digest)
.expect("block not found in mock provider")
}
}

#[test]
#[should_panic = "initial blocks must be contiguous in height"]
fn test_panics_on_non_contiguous_initial_blocks() {
AncestorStream::new(
MockProvider::default(),
vec![
Block::new::<Sha256>(Sha256Digest::EMPTY, 1, 1),
Block::new::<Sha256>(Sha256Digest::EMPTY, 3, 3),
],
);
}

#[test_async]
async fn test_empty_yields_none() {
let mut stream: AncestorStream<MockProvider, Block<Sha256Digest>> =
AncestorStream::new(MockProvider::default(), vec![]);
assert_eq!(stream.next().await, None);
}

#[test_async]
async fn test_yields_ancestors() {
let block1 = Block::new::<Sha256>(Sha256Digest::EMPTY, 1, 1);
let block2 = Block::new::<Sha256>(block1.digest(), 2, 2);
let block3 = Block::new::<Sha256>(block2.digest(), 3, 3);

let provider = MockProvider(vec![block1.clone(), block2.clone()]);
let stream = AncestorStream::new(provider, [block3.clone()]);

let results = stream.collect::<Vec<_>>().await;
assert_eq!(results, vec![block3, block2, block1]);
}

#[test_async]
async fn test_yields_ancestors_all_buffered() {
let block1 = Block::new::<Sha256>(Sha256Digest::EMPTY, 1, 1);
let block2 = Block::new::<Sha256>(block1.digest(), 2, 2);
let block3 = Block::new::<Sha256>(block2.digest(), 3, 3);

let provider = MockProvider(vec![]);
let stream =
AncestorStream::new(provider, [block1.clone(), block2.clone(), block3.clone()]);

let results = stream.collect::<Vec<_>>().await;
assert_eq!(results, vec![block3, block2, block1]);
}
}
Loading
Loading