Skip to content
Draft
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 0 additions & 1 deletion consensus/fuzz/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -240,7 +240,6 @@ fn run<P: Simplex>(input: FuzzInput) {
scheme: schemes[i].clone(),
elector,
automaton: application.clone(),
relay: application.clone(),
reporter: reporter.clone(),
partition: validator.to_string(),
mailbox_size: 1024,
Expand Down
67 changes: 8 additions & 59 deletions consensus/src/application/marshaled.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
//! # Usage
//!
//! Wrap your application implementation with [Marshaled::new] and provide it to your
//! consensus engine for the [Automaton] and [Relay]. The wrapper handles all epoch logic transparently.
//! consensus engine as the [Automaton]. The wrapper handles all epoch logic transparently.
//!
//! ```rust,ignore
//! let application = Marshaled::new(
Expand All @@ -37,22 +37,20 @@ use crate::{
marshal::{self, ingress::mailbox::AncestorStream, Update},
simplex::types::Context,
types::{Epoch, Epocher, Round},
Application, Automaton, Block, CertifiableAutomaton, Epochable, Relay, Reporter,
VerifyingApplication,
Application, Automaton, Block, CertifiableAutomaton, Epochable, Reporter, VerifyingApplication,
};
use commonware_cryptography::certificate::Scheme;
use commonware_runtime::{telemetry::metrics::status::GaugeExt, Clock, Metrics, Spawner};
use commonware_utils::futures::ClosedExt;
use futures::{
channel::oneshot::{self, Canceled},
future::{select, try_join, Either, Ready},
lock::Mutex,
pin_mut,
};
use prometheus_client::metrics::gauge::Gauge;
use rand::Rng;
use std::{sync::Arc, time::Instant};
use tracing::{debug, warn};
use std::time::Instant;
use tracing::debug;

/// An [Application] adapter that handles epoch transitions and validates block ancestry.
///
Expand Down Expand Up @@ -86,7 +84,6 @@ where
application: A,
marshal: marshal::Mailbox<S, B>,
epocher: ES,
last_built: Arc<Mutex<Option<(Round, B)>>>,

build_duration: Gauge,
}
Expand All @@ -113,7 +110,6 @@ where
application,
marshal,
epocher,
last_built: Arc::new(Mutex::new(None)),

build_duration,
}
Expand Down Expand Up @@ -172,15 +168,14 @@ where
/// boundary block to avoid creating blocks that would be invalidated by the epoch transition.
///
/// The proposal operation is spawned in a background task and returns a receiver that will
/// contain the proposed block's commitment when ready. The built block is cached for later
/// broadcasting.
/// contain the proposed block's commitment when ready. The built block is persisted and
/// broadcast to the network before returning.
async fn propose(
&mut self,
consensus_context: Context<Self::Digest, S::PublicKey>,
) -> oneshot::Receiver<Self::Digest> {
let mut marshal = self.marshal.clone();
let mut application = self.application.clone();
let last_built = self.last_built.clone();
let epocher = self.epocher.clone();

// Metrics
Expand Down Expand Up @@ -229,10 +224,7 @@ where
.expect("current epoch should exist");
if parent.height() == last_in_epoch {
let digest = parent.commitment();
{
let mut lock = last_built.lock().await;
*lock = Some((consensus_context.round, parent));
}
marshal.proposed(consensus_context.round, parent).await;

let result = tx.send(digest);
debug!(
Expand Down Expand Up @@ -273,10 +265,7 @@ where
let _ = build_duration.try_set(start.elapsed().as_millis());

let digest = built_block.commitment();
{
let mut lock = last_built.lock().await;
*lock = Some((consensus_context.round, built_block));
}
marshal.proposed(consensus_context.round, built_block).await;

let result = tx.send(digest);
debug!(
Expand Down Expand Up @@ -444,46 +433,6 @@ where
// Uses default certify implementation which always returns true
}

impl<E, S, A, B, ES> Relay for Marshaled<E, S, A, B, ES>
where
E: Rng + Spawner + Metrics + Clock,
S: Scheme,
A: Application<E, Block = B, Context = Context<B::Commitment, S::PublicKey>>,
B: Block,
ES: Epocher,
{
type Digest = B::Commitment;

/// Broadcasts a previously built block to the network.
///
/// This uses the cached block from the last proposal operation. If no block was built or
/// the commitment does not match the cached block, the broadcast is skipped with a warning.
async fn broadcast(&mut self, commitment: Self::Digest) {
let Some((round, block)) = self.last_built.lock().await.clone() else {
warn!("missing block to broadcast");
return;
};

if block.commitment() != commitment {
warn!(
round = %round,
commitment = %block.commitment(),
height = block.height(),
"skipping requested broadcast of block with mismatched commitment"
);
return;
}

debug!(
round = %round,
commitment = %block.commitment(),
height = block.height(),
"requested broadcast of built block"
);
self.marshal.proposed(round, block).await;
}
}

impl<E, S, A, B, ES> Reporter for Marshaled<E, S, A, B, ES>
where
E: Rng + Spawner + Metrics + Clock,
Expand Down
38 changes: 22 additions & 16 deletions consensus/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -128,6 +128,28 @@ cfg_if::cfg_if! {
}
}

/// RetryableAutomaton extends [Automaton] with the ability to re-broadcast payloads.
///
/// This trait is useful for consensus implementations (like ordered_broadcast) where
/// validators need the full payload to verify, and the payload may need to be
/// re-broadcast when the validator set changes (e.g., after an epoch transition)
/// or when acknowledgements are not received in time.
pub trait RetryableAutomaton: Automaton {
/// Re-broadcast the payload associated with the given digest.
///
/// Called by the engine when it needs to rebroadcast a proposal that hasn't
/// yet received a quorum of acks. This typically happens when:
/// - The rebroadcast timeout expires without achieving quorum
/// - The epoch changes, requiring the payload to be sent to new validators
///
/// The implementation should ensure the full payload data is made available
/// to validators who need to verify it.
fn repropose(
&mut self,
payload: Self::Digest,
) -> impl Future<Output = ()> + Send;
}

/// Application is a minimal interface for standard implementations that operate over a stream
/// of epoched blocks.
pub trait Application<E>: Clone + Send + 'static
Expand Down Expand Up @@ -175,22 +197,6 @@ cfg_if::cfg_if! {
) -> impl Future<Output = bool> + Send;
}

/// Relay is the interface responsible for broadcasting payloads to the network.
///
/// The consensus engine is only aware of a payload's digest, not its contents. It is up
/// to the relay to efficiently broadcast the full payload to other participants.
pub trait Relay: Clone + Send + 'static {
/// Hash of an arbitrary payload.
type Digest: Digest;

/// Called once consensus begins working towards a proposal provided by `Automaton` (i.e.
/// it isn't dropped).
///
/// Other participants may not begin voting on a proposal until they have the full contents,
/// so timely delivery often yields better performance.
fn broadcast(&mut self, payload: Self::Digest) -> impl Future<Output = ()> + Send;
}

/// Reporter is the interface responsible for reporting activity to some external actor.
pub trait Reporter: Clone + Send + 'static {
/// Activity is specified by the underlying consensus implementation and can be interpreted if desired.
Expand Down
19 changes: 13 additions & 6 deletions consensus/src/ordered_broadcast/config.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
use super::types::{Activity, Context, SequencersProvider};
use crate::{
types::{Epoch, EpochDelta},
Automaton, Monitor, Relay, Reporter,
Monitor, Reporter, RetryableAutomaton,
};
use commonware_cryptography::{certificate::Provider, Digest, Signer};
use commonware_runtime::buffer::PoolRef;
Expand All @@ -13,8 +13,7 @@ pub struct Config<
S: SequencersProvider,
P: Provider<Scope = Epoch>,
D: Digest,
A: Automaton<Context = Context<C::PublicKey>, Digest = D>,
R: Relay<Digest = D>,
A: RetryableAutomaton<Context = Context<C::PublicKey>, Digest = D>,
Z: Reporter<Activity = Activity<C::PublicKey, P::Scheme, D>>,
M: Monitor<Index = Epoch>,
> {
Expand All @@ -28,11 +27,16 @@ pub struct Config<
pub validators_provider: P,

/// Proposes and verifies digests.
///
/// The automaton is responsible for broadcasting the full payload data to other
/// participants during `propose()`. The engine only broadcasts signed digest
/// references (Node messages), not the payloads themselves. Validators need
/// access to the full payload to verify it.
///
/// The engine will call `repropose()` when it needs to rebroadcast a payload
/// (e.g., after an epoch change when new validators need the payload).
pub automaton: A,

/// Broadcasts the raw payload.
pub relay: R,

/// Notified when a chunk receives a quorum of acks.
pub reporter: Z,

Expand All @@ -51,6 +55,9 @@ pub struct Config<
pub priority_acks: bool,

/// How often a proposal is rebroadcast to all validators if no quorum is reached.
///
/// This controls rebroadcast of the signed digest reference (Node message), not
/// the full payload. Payload rebroadcast is the application's responsibility.
pub rebroadcast_timeout: Duration,

/// A tuple representing the epochs to keep in memory.
Expand Down
22 changes: 9 additions & 13 deletions consensus/src/ordered_broadcast/engine.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ use super::{
};
use crate::{
types::{Epoch, EpochDelta},
Automaton, Monitor, Relay, Reporter,
Monitor, Reporter, RetryableAutomaton,
};
use commonware_codec::Encode;
use commonware_cryptography::{
Expand Down Expand Up @@ -67,8 +67,7 @@ pub struct Engine<
S: SequencersProvider<PublicKey = C::PublicKey>,
P: Provider<Scope = Epoch, Scheme: scheme::Scheme<C::PublicKey, D>>,
D: Digest,
A: Automaton<Context = Context<C::PublicKey>, Digest = D> + Clone,
R: Relay<Digest = D>,
A: RetryableAutomaton<Context = Context<C::PublicKey>, Digest = D> + Clone,
Z: Reporter<Activity = Activity<C::PublicKey, P::Scheme, D>>,
M: Monitor<Index = Epoch>,
> {
Expand All @@ -80,7 +79,6 @@ pub struct Engine<
sequencers_provider: S,
validators_provider: P,
automaton: A,
relay: R,
monitor: M,
reporter: Z,

Expand Down Expand Up @@ -203,14 +201,13 @@ impl<
S: SequencersProvider<PublicKey = C::PublicKey>,
P: Provider<Scope = Epoch, Scheme: scheme::Scheme<C::PublicKey, D, PublicKey = C::PublicKey>>,
D: Digest,
A: Automaton<Context = Context<C::PublicKey>, Digest = D> + Clone,
R: Relay<Digest = D>,
A: RetryableAutomaton<Context = Context<C::PublicKey>, Digest = D> + Clone,
Z: Reporter<Activity = Activity<C::PublicKey, P::Scheme, D>>,
M: Monitor<Index = Epoch>,
> Engine<E, C, S, P, D, A, R, Z, M>
> Engine<E, C, S, P, D, A, Z, M>
{
/// Creates a new engine with the given context and configuration.
pub fn new(context: E, cfg: Config<C, S, P, D, A, R, Z, M>) -> Self {
pub fn new(context: E, cfg: Config<C, S, P, D, A, Z, M>) -> Self {
// TODO(#1833): Metrics should use the post-start context
let metrics = metrics::Metrics::init(context.clone());

Expand All @@ -220,7 +217,6 @@ impl<
sequencers_provider: cfg.sequencers_provider,
validators_provider: cfg.validators_provider,
automaton: cfg.automaton,
relay: cfg.relay,
reporter: cfg.reporter,
monitor: cfg.monitor,
namespace: cfg.namespace,
Expand Down Expand Up @@ -815,7 +811,10 @@ impl<
return Err(Error::AlreadyCertified);
}

// Broadcast the message, which resets the rebroadcast deadline
// Ask the application to re-broadcast the payload (e.g., to new validators after epoch change)
self.automaton.repropose(tip.chunk.payload).await;

// Broadcast the Node message, which resets the rebroadcast deadline
guard.set(Status::Failure);
self.broadcast(tip, node_sender, self.epoch).await?;
guard.set(Status::Success);
Expand All @@ -835,9 +834,6 @@ impl<
};
let validators = scheme.participants();

// Tell the relay to broadcast the full data
self.relay.broadcast(node.chunk.payload).await;

// Send the node to all validators
node_sender
.send(
Expand Down
9 changes: 4 additions & 5 deletions consensus/src/ordered_broadcast/mocks/automaton.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use crate::{ordered_broadcast::types::Context, types::Epoch, Automaton as A, Relay as R};
use crate::{ordered_broadcast::types::Context, types::Epoch, Automaton as A, RetryableAutomaton as RA};
use bytes::Bytes;
use commonware_cryptography::{sha256, Hasher, PublicKey, Sha256};
use futures::channel::oneshot;
Expand Down Expand Up @@ -59,9 +59,8 @@ impl<P: PublicKey> A for Automaton<P> {
}
}

impl<P: PublicKey> R for Automaton<P> {
type Digest = sha256::Digest;
async fn broadcast(&mut self, payload: Self::Digest) {
trace!(?payload, "broadcast");
impl<P: PublicKey> RA for Automaton<P> {
async fn repropose(&mut self, payload: Self::Digest) {
trace!(?payload, "repropose");
}
}
Loading