Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 0 additions & 1 deletion consensus/fuzz/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -240,7 +240,6 @@ fn run<P: Simplex>(input: FuzzInput) {
scheme: schemes[i].clone(),
elector,
automaton: application.clone(),
relay: application.clone(),
reporter: reporter.clone(),
partition: validator.to_string(),
mailbox_size: 1024,
Expand Down
67 changes: 8 additions & 59 deletions consensus/src/application/marshaled.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
//! # Usage
//!
//! Wrap your application implementation with [Marshaled::new] and provide it to your
//! consensus engine for the [Automaton] and [Relay]. The wrapper handles all epoch logic transparently.
//! consensus engine as the [Automaton]. The wrapper handles all epoch logic transparently.
//!
//! ```rust,ignore
//! let application = Marshaled::new(
Expand All @@ -37,22 +37,20 @@ use crate::{
marshal::{self, ingress::mailbox::AncestorStream, Update},
simplex::types::Context,
types::{Epoch, Epocher, Round},
Application, Automaton, Block, CertifiableAutomaton, Epochable, Relay, Reporter,
VerifyingApplication,
Application, Automaton, Block, CertifiableAutomaton, Epochable, Reporter, VerifyingApplication,
};
use commonware_cryptography::certificate::Scheme;
use commonware_runtime::{telemetry::metrics::status::GaugeExt, Clock, Metrics, Spawner};
use commonware_utils::futures::ClosedExt;
use futures::{
channel::oneshot::{self, Canceled},
future::{select, try_join, Either, Ready},
lock::Mutex,
pin_mut,
};
use prometheus_client::metrics::gauge::Gauge;
use rand::Rng;
use std::{sync::Arc, time::Instant};
use tracing::{debug, warn};
use std::time::Instant;
use tracing::debug;

/// An [Application] adapter that handles epoch transitions and validates block ancestry.
///
Expand Down Expand Up @@ -86,7 +84,6 @@ where
application: A,
marshal: marshal::Mailbox<S, B>,
epocher: ES,
last_built: Arc<Mutex<Option<(Round, B)>>>,

build_duration: Gauge,
}
Expand All @@ -113,7 +110,6 @@ where
application,
marshal,
epocher,
last_built: Arc::new(Mutex::new(None)),

build_duration,
}
Expand Down Expand Up @@ -172,15 +168,14 @@ where
/// boundary block to avoid creating blocks that would be invalidated by the epoch transition.
///
/// The proposal operation is spawned in a background task and returns a receiver that will
/// contain the proposed block's commitment when ready. The built block is cached for later
/// broadcasting.
/// contain the proposed block's commitment when ready. The built block is persisted and
/// broadcast to the network before returning.
async fn propose(
&mut self,
consensus_context: Context<Self::Digest, S::PublicKey>,
) -> oneshot::Receiver<Self::Digest> {
let mut marshal = self.marshal.clone();
let mut application = self.application.clone();
let last_built = self.last_built.clone();
let epocher = self.epocher.clone();

// Metrics
Expand Down Expand Up @@ -229,10 +224,7 @@ where
.expect("current epoch should exist");
if parent.height() == last_in_epoch {
let digest = parent.commitment();
{
let mut lock = last_built.lock().await;
*lock = Some((consensus_context.round, parent));
}
marshal.proposed(consensus_context.round, parent).await;

let result = tx.send(digest);
debug!(
Expand Down Expand Up @@ -273,10 +265,7 @@ where
let _ = build_duration.try_set(start.elapsed().as_millis());

let digest = built_block.commitment();
{
let mut lock = last_built.lock().await;
*lock = Some((consensus_context.round, built_block));
}
marshal.proposed(consensus_context.round, built_block).await;

let result = tx.send(digest);
debug!(
Expand Down Expand Up @@ -444,46 +433,6 @@ where
// Uses default certify implementation which always returns true
}

impl<E, S, A, B, ES> Relay for Marshaled<E, S, A, B, ES>
where
E: Rng + Spawner + Metrics + Clock,
S: Scheme,
A: Application<E, Block = B, Context = Context<B::Commitment, S::PublicKey>>,
B: Block,
ES: Epocher,
{
type Digest = B::Commitment;

/// Broadcasts a previously built block to the network.
///
/// This uses the cached block from the last proposal operation. If no block was built or
/// the commitment does not match the cached block, the broadcast is skipped with a warning.
async fn broadcast(&mut self, commitment: Self::Digest) {
let Some((round, block)) = self.last_built.lock().await.clone() else {
warn!("missing block to broadcast");
return;
};

if block.commitment() != commitment {
warn!(
round = %round,
commitment = %block.commitment(),
height = block.height(),
"skipping requested broadcast of block with mismatched commitment"
);
return;
}

debug!(
round = %round,
commitment = %block.commitment(),
height = block.height(),
"requested broadcast of built block"
);
self.marshal.proposed(round, block).await;
}
}

impl<E, S, A, B, ES> Reporter for Marshaled<E, S, A, B, ES>
where
E: Rng + Spawner + Metrics + Clock,
Expand Down
38 changes: 22 additions & 16 deletions consensus/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -128,6 +128,28 @@ cfg_if::cfg_if! {
}
}

/// RetryableAutomaton extends [Automaton] with the ability to re-broadcast payloads.
///
/// This trait is useful for consensus implementations (like ordered_broadcast) where
/// validators need the full payload to verify, and the payload may need to be
/// re-broadcast when the validator set changes (e.g., after an epoch transition)
/// or when acknowledgements are not received in time.
pub trait RetryableAutomaton: Automaton {
/// Re-broadcast the payload associated with the given digest.
///
/// Called by the engine when it needs to rebroadcast a proposal that hasn't
/// yet received a quorum of acks. This typically happens when:
/// - The rebroadcast timeout expires without achieving quorum
/// - The epoch changes, requiring the payload to be sent to new validators
///
/// The implementation should ensure the full payload data is made available
/// to validators who need to verify it.
fn repropose(
&mut self,
payload: Self::Digest,
) -> impl Future<Output = ()> + Send;
}

/// Application is a minimal interface for standard implementations that operate over a stream
/// of epoched blocks.
pub trait Application<E>: Clone + Send + 'static
Expand Down Expand Up @@ -175,22 +197,6 @@ cfg_if::cfg_if! {
) -> impl Future<Output = bool> + Send;
}

/// Relay is the interface responsible for broadcasting payloads to the network.
///
/// The consensus engine is only aware of a payload's digest, not its contents. It is up
/// to the relay to efficiently broadcast the full payload to other participants.
pub trait Relay: Clone + Send + 'static {
/// Hash of an arbitrary payload.
type Digest: Digest;

/// Called once consensus begins working towards a proposal provided by `Automaton` (i.e.
/// it isn't dropped).
///
/// Other participants may not begin voting on a proposal until they have the full contents,
/// so timely delivery often yields better performance.
fn broadcast(&mut self, payload: Self::Digest) -> impl Future<Output = ()> + Send;
}

/// Reporter is the interface responsible for reporting activity to some external actor.
pub trait Reporter: Clone + Send + 'static {
/// Activity is specified by the underlying consensus implementation and can be interpreted if desired.
Expand Down
19 changes: 13 additions & 6 deletions consensus/src/ordered_broadcast/config.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
use super::types::{Activity, Context, SequencersProvider};
use crate::{
types::{Epoch, EpochDelta},
Automaton, Monitor, Relay, Reporter,
Monitor, Reporter, RetryableAutomaton,
};
use commonware_cryptography::{certificate::Provider, Digest, Signer};
use commonware_runtime::buffer::PoolRef;
Expand All @@ -13,8 +13,7 @@ pub struct Config<
S: SequencersProvider,
P: Provider<Scope = Epoch>,
D: Digest,
A: Automaton<Context = Context<C::PublicKey>, Digest = D>,
R: Relay<Digest = D>,
A: RetryableAutomaton<Context = Context<C::PublicKey>, Digest = D>,
Z: Reporter<Activity = Activity<C::PublicKey, P::Scheme, D>>,
M: Monitor<Index = Epoch>,
> {
Expand All @@ -28,11 +27,16 @@ pub struct Config<
pub validators_provider: P,

/// Proposes and verifies digests.
///
/// The automaton is responsible for broadcasting the full payload data to other
/// participants during `propose()`. The engine only broadcasts signed digest
/// references (Node messages), not the payloads themselves. Validators need
/// access to the full payload to verify it.
///
/// The engine will call `repropose()` when it needs to rebroadcast a payload
/// (e.g., after an epoch change when new validators need the payload).
pub automaton: A,

/// Broadcasts the raw payload.
pub relay: R,

/// Notified when a chunk receives a quorum of acks.
pub reporter: Z,

Expand All @@ -51,6 +55,9 @@ pub struct Config<
pub priority_acks: bool,

/// How often a proposal is rebroadcast to all validators if no quorum is reached.
///
/// This controls rebroadcast of the signed digest reference (Node message), not
/// the full payload. Payload rebroadcast is the application's responsibility.
pub rebroadcast_timeout: Duration,

/// A tuple representing the epochs to keep in memory.
Expand Down
22 changes: 9 additions & 13 deletions consensus/src/ordered_broadcast/engine.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ use super::{
};
use crate::{
types::{Epoch, EpochDelta},
Automaton, Monitor, Relay, Reporter,
Monitor, Reporter, RetryableAutomaton,
};
use commonware_codec::Encode;
use commonware_cryptography::{
Expand Down Expand Up @@ -67,8 +67,7 @@ pub struct Engine<
S: SequencersProvider<PublicKey = C::PublicKey>,
P: Provider<Scope = Epoch, Scheme: scheme::Scheme<C::PublicKey, D>>,
D: Digest,
A: Automaton<Context = Context<C::PublicKey>, Digest = D> + Clone,
R: Relay<Digest = D>,
A: RetryableAutomaton<Context = Context<C::PublicKey>, Digest = D> + Clone,
Z: Reporter<Activity = Activity<C::PublicKey, P::Scheme, D>>,
M: Monitor<Index = Epoch>,
> {
Expand All @@ -80,7 +79,6 @@ pub struct Engine<
sequencers_provider: S,
validators_provider: P,
automaton: A,
relay: R,
monitor: M,
reporter: Z,

Expand Down Expand Up @@ -203,14 +201,13 @@ impl<
S: SequencersProvider<PublicKey = C::PublicKey>,
P: Provider<Scope = Epoch, Scheme: scheme::Scheme<C::PublicKey, D, PublicKey = C::PublicKey>>,
D: Digest,
A: Automaton<Context = Context<C::PublicKey>, Digest = D> + Clone,
R: Relay<Digest = D>,
A: RetryableAutomaton<Context = Context<C::PublicKey>, Digest = D> + Clone,
Z: Reporter<Activity = Activity<C::PublicKey, P::Scheme, D>>,
M: Monitor<Index = Epoch>,
> Engine<E, C, S, P, D, A, R, Z, M>
> Engine<E, C, S, P, D, A, Z, M>
{
/// Creates a new engine with the given context and configuration.
pub fn new(context: E, cfg: Config<C, S, P, D, A, R, Z, M>) -> Self {
pub fn new(context: E, cfg: Config<C, S, P, D, A, Z, M>) -> Self {
// TODO(#1833): Metrics should use the post-start context
let metrics = metrics::Metrics::init(context.clone());

Expand All @@ -220,7 +217,6 @@ impl<
sequencers_provider: cfg.sequencers_provider,
validators_provider: cfg.validators_provider,
automaton: cfg.automaton,
relay: cfg.relay,
reporter: cfg.reporter,
monitor: cfg.monitor,
namespace: cfg.namespace,
Expand Down Expand Up @@ -815,7 +811,10 @@ impl<
return Err(Error::AlreadyCertified);
}

// Broadcast the message, which resets the rebroadcast deadline
// Ask the application to re-broadcast the payload (e.g., to new validators after epoch change)
self.automaton.repropose(tip.chunk.payload).await;

// Broadcast the Node message, which resets the rebroadcast deadline
guard.set(Status::Failure);
self.broadcast(tip, node_sender, self.epoch).await?;
guard.set(Status::Success);
Expand All @@ -835,9 +834,6 @@ impl<
};
let validators = scheme.participants();

// Tell the relay to broadcast the full data
self.relay.broadcast(node.chunk.payload).await;

// Send the node to all validators
node_sender
.send(
Expand Down
11 changes: 6 additions & 5 deletions consensus/src/ordered_broadcast/mocks/automaton.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,6 @@
use crate::{ordered_broadcast::types::Context, types::Epoch, Automaton as A, Relay as R};
use crate::{
ordered_broadcast::types::Context, types::Epoch, Automaton as A, RetryableAutomaton as RA,
};
use bytes::Bytes;
use commonware_cryptography::{sha256, Hasher, PublicKey, Sha256};
use futures::channel::oneshot;
Expand Down Expand Up @@ -59,9 +61,8 @@ impl<P: PublicKey> A for Automaton<P> {
}
}

impl<P: PublicKey> R for Automaton<P> {
type Digest = sha256::Digest;
async fn broadcast(&mut self, payload: Self::Digest) {
trace!(?payload, "broadcast");
impl<P: PublicKey> RA for Automaton<P> {
async fn repropose(&mut self, payload: Self::Digest) {
trace!(?payload, "repropose");
}
}
Loading