Skip to content
Draft
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 0 additions & 1 deletion consensus/fuzz/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -240,7 +240,6 @@ fn run<P: Simplex>(input: FuzzInput) {
scheme: schemes[i].clone(),
elector,
automaton: application.clone(),
relay: application.clone(),
reporter: reporter.clone(),
partition: validator.to_string(),
mailbox_size: 1024,
Expand Down
67 changes: 8 additions & 59 deletions consensus/src/application/marshaled.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
//! # Usage
//!
//! Wrap your application implementation with [Marshaled::new] and provide it to your
//! consensus engine for the [Automaton] and [Relay]. The wrapper handles all epoch logic transparently.
//! consensus engine as the [Automaton]. The wrapper handles all epoch logic transparently.
//!
//! ```rust,ignore
//! let application = Marshaled::new(
Expand All @@ -37,22 +37,20 @@ use crate::{
marshal::{self, ingress::mailbox::AncestorStream, Update},
simplex::types::Context,
types::{Epoch, Epocher, Round},
Application, Automaton, Block, CertifiableAutomaton, Epochable, Relay, Reporter,
VerifyingApplication,
Application, Automaton, Block, CertifiableAutomaton, Epochable, Reporter, VerifyingApplication,
};
use commonware_cryptography::certificate::Scheme;
use commonware_runtime::{telemetry::metrics::status::GaugeExt, Clock, Metrics, Spawner};
use commonware_utils::futures::ClosedExt;
use futures::{
channel::oneshot::{self, Canceled},
future::{select, try_join, Either, Ready},
lock::Mutex,
pin_mut,
};
use prometheus_client::metrics::gauge::Gauge;
use rand::Rng;
use std::{sync::Arc, time::Instant};
use tracing::{debug, warn};
use std::time::Instant;
use tracing::debug;

/// An [Application] adapter that handles epoch transitions and validates block ancestry.
///
Expand Down Expand Up @@ -86,7 +84,6 @@ where
application: A,
marshal: marshal::Mailbox<S, B>,
epocher: ES,
last_built: Arc<Mutex<Option<(Round, B)>>>,

build_duration: Gauge,
}
Expand All @@ -113,7 +110,6 @@ where
application,
marshal,
epocher,
last_built: Arc::new(Mutex::new(None)),

build_duration,
}
Expand Down Expand Up @@ -172,15 +168,14 @@ where
/// boundary block to avoid creating blocks that would be invalidated by the epoch transition.
///
/// The proposal operation is spawned in a background task and returns a receiver that will
/// contain the proposed block's commitment when ready. The built block is cached for later
/// broadcasting.
/// contain the proposed block's commitment when ready. The built block is persisted and
/// broadcast to the network before returning.
async fn propose(
&mut self,
consensus_context: Context<Self::Digest, S::PublicKey>,
) -> oneshot::Receiver<Self::Digest> {
let mut marshal = self.marshal.clone();
let mut application = self.application.clone();
let last_built = self.last_built.clone();
let epocher = self.epocher.clone();

// Metrics
Expand Down Expand Up @@ -229,10 +224,7 @@ where
.expect("current epoch should exist");
if parent.height() == last_in_epoch {
let digest = parent.commitment();
{
let mut lock = last_built.lock().await;
*lock = Some((consensus_context.round, parent));
}
marshal.proposed(consensus_context.round, parent).await;

let result = tx.send(digest);
debug!(
Expand Down Expand Up @@ -273,10 +265,7 @@ where
let _ = build_duration.try_set(start.elapsed().as_millis());

let digest = built_block.commitment();
{
let mut lock = last_built.lock().await;
*lock = Some((consensus_context.round, built_block));
}
marshal.proposed(consensus_context.round, built_block).await;

let result = tx.send(digest);
debug!(
Expand Down Expand Up @@ -444,46 +433,6 @@ where
// Uses default certify implementation which always returns true
}

impl<E, S, A, B, ES> Relay for Marshaled<E, S, A, B, ES>
where
E: Rng + Spawner + Metrics + Clock,
S: Scheme,
A: Application<E, Block = B, Context = Context<B::Commitment, S::PublicKey>>,
B: Block,
ES: Epocher,
{
type Digest = B::Commitment;

/// Broadcasts a previously built block to the network.
///
/// This uses the cached block from the last proposal operation. If no block was built or
/// the commitment does not match the cached block, the broadcast is skipped with a warning.
async fn broadcast(&mut self, commitment: Self::Digest) {
let Some((round, block)) = self.last_built.lock().await.clone() else {
warn!("missing block to broadcast");
return;
};

if block.commitment() != commitment {
warn!(
round = %round,
commitment = %block.commitment(),
height = block.height(),
"skipping requested broadcast of block with mismatched commitment"
);
return;
}

debug!(
round = %round,
commitment = %block.commitment(),
height = block.height(),
"requested broadcast of built block"
);
self.marshal.proposed(round, block).await;
}
}

impl<E, S, A, B, ES> Reporter for Marshaled<E, S, A, B, ES>
where
E: Rng + Spawner + Metrics + Clock,
Expand Down
16 changes: 0 additions & 16 deletions consensus/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -175,22 +175,6 @@ cfg_if::cfg_if! {
) -> impl Future<Output = bool> + Send;
}

/// Relay is the interface responsible for broadcasting payloads to the network.
///
/// The consensus engine is only aware of a payload's digest, not its contents. It is up
/// to the relay to efficiently broadcast the full payload to other participants.
pub trait Relay: Clone + Send + 'static {
/// Hash of an arbitrary payload.
type Digest: Digest;

/// Called once consensus begins working towards a proposal provided by `Automaton` (i.e.
/// it isn't dropped).
///
/// Other participants may not begin voting on a proposal until they have the full contents,
/// so timely delivery often yields better performance.
fn broadcast(&mut self, payload: Self::Digest) -> impl Future<Output = ()> + Send;
}

/// Reporter is the interface responsible for reporting activity to some external actor.
pub trait Reporter: Clone + Send + 'static {
/// Activity is specified by the underlying consensus implementation and can be interpreted if desired.
Expand Down
6 changes: 1 addition & 5 deletions consensus/src/ordered_broadcast/config.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
use super::types::{Activity, Context, SequencersProvider};
use crate::{
types::{Epoch, EpochDelta},
Automaton, Monitor, Relay, Reporter,
Automaton, Monitor, Reporter,
};
use commonware_cryptography::{certificate::Provider, Digest, Signer};
use commonware_runtime::buffer::PoolRef;
Expand All @@ -14,7 +14,6 @@ pub struct Config<
P: Provider<Scope = Epoch>,
D: Digest,
A: Automaton<Context = Context<C::PublicKey>, Digest = D>,
R: Relay<Digest = D>,
Z: Reporter<Activity = Activity<C::PublicKey, P::Scheme, D>>,
M: Monitor<Index = Epoch>,
> {
Expand All @@ -30,9 +29,6 @@ pub struct Config<
/// Proposes and verifies digests.
pub automaton: A,

/// Broadcasts the raw payload.
pub relay: R,

/// Notified when a chunk receives a quorum of acks.
pub reporter: Z,

Expand Down
13 changes: 3 additions & 10 deletions consensus/src/ordered_broadcast/engine.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ use super::{
};
use crate::{
types::{Epoch, EpochDelta},
Automaton, Monitor, Relay, Reporter,
Automaton, Monitor, Reporter,
};
use commonware_codec::Encode;
use commonware_cryptography::{
Expand Down Expand Up @@ -68,7 +68,6 @@ pub struct Engine<
P: Provider<Scope = Epoch, Scheme: scheme::Scheme<C::PublicKey, D>>,
D: Digest,
A: Automaton<Context = Context<C::PublicKey>, Digest = D> + Clone,
R: Relay<Digest = D>,
Z: Reporter<Activity = Activity<C::PublicKey, P::Scheme, D>>,
M: Monitor<Index = Epoch>,
> {
Expand All @@ -80,7 +79,6 @@ pub struct Engine<
sequencers_provider: S,
validators_provider: P,
automaton: A,
relay: R,
monitor: M,
reporter: Z,

Expand Down Expand Up @@ -204,13 +202,12 @@ impl<
P: Provider<Scope = Epoch, Scheme: scheme::Scheme<C::PublicKey, D, PublicKey = C::PublicKey>>,
D: Digest,
A: Automaton<Context = Context<C::PublicKey>, Digest = D> + Clone,
R: Relay<Digest = D>,
Z: Reporter<Activity = Activity<C::PublicKey, P::Scheme, D>>,
M: Monitor<Index = Epoch>,
> Engine<E, C, S, P, D, A, R, Z, M>
> Engine<E, C, S, P, D, A, Z, M>
{
/// Creates a new engine with the given context and configuration.
pub fn new(context: E, cfg: Config<C, S, P, D, A, R, Z, M>) -> Self {
pub fn new(context: E, cfg: Config<C, S, P, D, A, Z, M>) -> Self {
// TODO(#1833): Metrics should use the post-start context
let metrics = metrics::Metrics::init(context.clone());

Expand All @@ -220,7 +217,6 @@ impl<
sequencers_provider: cfg.sequencers_provider,
validators_provider: cfg.validators_provider,
automaton: cfg.automaton,
relay: cfg.relay,
reporter: cfg.reporter,
monitor: cfg.monitor,
namespace: cfg.namespace,
Expand Down Expand Up @@ -835,9 +831,6 @@ impl<
};
let validators = scheme.participants();

// Tell the relay to broadcast the full data
self.relay.broadcast(node.chunk.payload).await;

// Send the node to all validators
node_sender
.send(
Expand Down
9 changes: 1 addition & 8 deletions consensus/src/ordered_broadcast/mocks/automaton.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use crate::{ordered_broadcast::types::Context, types::Epoch, Automaton as A, Relay as R};
use crate::{ordered_broadcast::types::Context, types::Epoch, Automaton as A};
use bytes::Bytes;
use commonware_cryptography::{sha256, Hasher, PublicKey, Sha256};
use futures::channel::oneshot;
Expand Down Expand Up @@ -58,10 +58,3 @@ impl<P: PublicKey> A for Automaton<P> {
receiver
}
}

impl<P: PublicKey> R for Automaton<P> {
type Digest = sha256::Digest;
async fn broadcast(&mut self, payload: Self::Digest) {
trace!(?payload, "broadcast");
}
}
4 changes: 0 additions & 4 deletions consensus/src/ordered_broadcast/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -230,7 +230,6 @@ mod tests {
sequencers_provider: sequencers,
validators_provider,
automaton: automaton.clone(),
relay: automaton.clone(),
reporter: reporters.get(validator).unwrap().clone(),
monitor,
namespace: namespace.to_vec(),
Expand Down Expand Up @@ -735,7 +734,6 @@ mod tests {
sequencer_signer: Some(fixture.private_keys[idx].clone()),
sequencers_provider: sequencers,
validators_provider,
relay: automaton.clone(),
automaton: automaton.clone(),
reporter: reporters.get(validator).unwrap().clone(),
monitor,
Expand Down Expand Up @@ -886,7 +884,6 @@ mod tests {
sequencer_signer: None::<PrivateKey>, // Validators don't propose in this test
sequencers_provider: sequencers,
validators_provider,
relay: automaton.clone(),
automaton: automaton.clone(),
reporter: reporters.get(validator).unwrap().clone(),
monitor,
Expand Down Expand Up @@ -935,7 +932,6 @@ mod tests {
sequencer.public_key()
]),
validators_provider,
relay: automaton.clone(),
automaton,
reporter: reporters.get(&sequencer.public_key()).unwrap().clone(),
monitor: mocks::Monitor::new(epoch),
Expand Down
13 changes: 3 additions & 10 deletions consensus/src/simplex/actors/voter/actor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ use crate::{
},
},
types::{Round as Rnd, View},
CertifiableAutomaton, Relay, Reporter, Viewable, LATENCY,
CertifiableAutomaton, Reporter, Viewable, LATENCY,
};
use commonware_codec::Read;
use commonware_cryptography::Digest;
Expand Down Expand Up @@ -98,14 +98,12 @@ pub struct Actor<
B: Blocker<PublicKey = S::PublicKey>,
D: Digest,
A: CertifiableAutomaton<Digest = D, Context = Context<D, S::PublicKey>>,
R: Relay,
F: Reporter<Activity = Activity<S, D>>,
> {
context: ContextCell<E>,
state: State<E, S, L, D>,
blocker: B,
automaton: A,
relay: R,
reporter: F,

certificate_config: <S::Certificate as Read>::Cfg,
Expand All @@ -130,11 +128,10 @@ impl<
B: Blocker<PublicKey = S::PublicKey>,
D: Digest,
A: CertifiableAutomaton<Digest = D, Context = Context<D, S::PublicKey>>,
R: Relay<Digest = D>,
F: Reporter<Activity = Activity<S, D>>,
> Actor<E, S, L, B, D, A, R, F>
> Actor<E, S, L, B, D, A, F>
{
pub fn new(context: E, cfg: Config<S, L, B, D, A, R, F>) -> (Self, Mailbox<S, D>) {
pub fn new(context: E, cfg: Config<S, L, B, D, A, F>) -> (Self, Mailbox<S, D>) {
// Assert correctness of timeouts
if cfg.leader_timeout > cfg.notarization_timeout {
panic!("leader timeout must be less than or equal to notarization timeout");
Expand Down Expand Up @@ -183,7 +180,6 @@ impl<
state,
blocker: cfg.blocker,
automaton: cfg.automaton,
relay: cfg.relay,
reporter: cfg.reporter,

certificate_config,
Expand Down Expand Up @@ -925,9 +921,6 @@ impl<
continue;
}
view = self.state.current_view();

// Notify application of proposal
self.relay.broadcast(proposed).await;
},
(context, verified) = verify_wait => {
// Clear verify waiter
Expand Down
Loading
Loading