diff --git a/consensus/fuzz/src/lib.rs b/consensus/fuzz/src/lib.rs index f3203965a5..60f26105bb 100644 --- a/consensus/fuzz/src/lib.rs +++ b/consensus/fuzz/src/lib.rs @@ -240,7 +240,6 @@ fn run(input: FuzzInput) { scheme: schemes[i].clone(), elector, automaton: application.clone(), - relay: application.clone(), reporter: reporter.clone(), partition: validator.to_string(), mailbox_size: 1024, diff --git a/consensus/src/application/marshaled.rs b/consensus/src/application/marshaled.rs index 25d566bb1e..40ceced65d 100644 --- a/consensus/src/application/marshaled.rs +++ b/consensus/src/application/marshaled.rs @@ -16,7 +16,7 @@ //! # Usage //! //! Wrap your application implementation with [Marshaled::new] and provide it to your -//! consensus engine for the [Automaton] and [Relay]. The wrapper handles all epoch logic transparently. +//! consensus engine as the [Automaton]. The wrapper handles all epoch logic transparently. //! //! ```rust,ignore //! let application = Marshaled::new( @@ -37,8 +37,7 @@ use crate::{ marshal::{self, ingress::mailbox::AncestorStream, Update}, simplex::types::Context, types::{Epoch, Epocher, Round}, - Application, Automaton, Block, CertifiableAutomaton, Epochable, Relay, Reporter, - VerifyingApplication, + Application, Automaton, Block, CertifiableAutomaton, Epochable, Reporter, VerifyingApplication, }; use commonware_cryptography::certificate::Scheme; use commonware_runtime::{telemetry::metrics::status::GaugeExt, Clock, Metrics, Spawner}; @@ -46,13 +45,12 @@ use commonware_utils::futures::ClosedExt; use futures::{ channel::oneshot::{self, Canceled}, future::{select, try_join, Either, Ready}, - lock::Mutex, pin_mut, }; use prometheus_client::metrics::gauge::Gauge; use rand::Rng; -use std::{sync::Arc, time::Instant}; -use tracing::{debug, warn}; +use std::time::Instant; +use tracing::debug; /// An [Application] adapter that handles epoch transitions and validates block ancestry. /// @@ -86,7 +84,6 @@ where application: A, marshal: marshal::Mailbox, epocher: ES, - last_built: Arc>>, build_duration: Gauge, } @@ -113,7 +110,6 @@ where application, marshal, epocher, - last_built: Arc::new(Mutex::new(None)), build_duration, } @@ -172,15 +168,14 @@ where /// boundary block to avoid creating blocks that would be invalidated by the epoch transition. /// /// The proposal operation is spawned in a background task and returns a receiver that will - /// contain the proposed block's commitment when ready. The built block is cached for later - /// broadcasting. + /// contain the proposed block's commitment when ready. The built block is persisted and + /// broadcast to the network before returning. async fn propose( &mut self, consensus_context: Context, ) -> oneshot::Receiver { let mut marshal = self.marshal.clone(); let mut application = self.application.clone(); - let last_built = self.last_built.clone(); let epocher = self.epocher.clone(); // Metrics @@ -229,10 +224,7 @@ where .expect("current epoch should exist"); if parent.height() == last_in_epoch { let digest = parent.commitment(); - { - let mut lock = last_built.lock().await; - *lock = Some((consensus_context.round, parent)); - } + marshal.proposed(consensus_context.round, parent).await; let result = tx.send(digest); debug!( @@ -273,10 +265,7 @@ where let _ = build_duration.try_set(start.elapsed().as_millis()); let digest = built_block.commitment(); - { - let mut lock = last_built.lock().await; - *lock = Some((consensus_context.round, built_block)); - } + marshal.proposed(consensus_context.round, built_block).await; let result = tx.send(digest); debug!( @@ -444,46 +433,6 @@ where // Uses default certify implementation which always returns true } -impl Relay for Marshaled -where - E: Rng + Spawner + Metrics + Clock, - S: Scheme, - A: Application>, - B: Block, - ES: Epocher, -{ - type Digest = B::Commitment; - - /// Broadcasts a previously built block to the network. - /// - /// This uses the cached block from the last proposal operation. If no block was built or - /// the commitment does not match the cached block, the broadcast is skipped with a warning. - async fn broadcast(&mut self, commitment: Self::Digest) { - let Some((round, block)) = self.last_built.lock().await.clone() else { - warn!("missing block to broadcast"); - return; - }; - - if block.commitment() != commitment { - warn!( - round = %round, - commitment = %block.commitment(), - height = block.height(), - "skipping requested broadcast of block with mismatched commitment" - ); - return; - } - - debug!( - round = %round, - commitment = %block.commitment(), - height = block.height(), - "requested broadcast of built block" - ); - self.marshal.proposed(round, block).await; - } -} - impl Reporter for Marshaled where E: Rng + Spawner + Metrics + Clock, diff --git a/consensus/src/lib.rs b/consensus/src/lib.rs index 00b50de8c8..5f4bd5132e 100644 --- a/consensus/src/lib.rs +++ b/consensus/src/lib.rs @@ -128,6 +128,28 @@ cfg_if::cfg_if! { } } + /// RetryableAutomaton extends [Automaton] with the ability to re-broadcast payloads. + /// + /// This trait is useful for consensus implementations (like ordered_broadcast) where + /// validators need the full payload to verify, and the payload may need to be + /// re-broadcast when the validator set changes (e.g., after an epoch transition) + /// or when acknowledgements are not received in time. + pub trait RetryableAutomaton: Automaton { + /// Re-broadcast the payload associated with the given digest. + /// + /// Called by the engine when it needs to rebroadcast a proposal that hasn't + /// yet received a quorum of acks. This typically happens when: + /// - The rebroadcast timeout expires without achieving quorum + /// - The epoch changes, requiring the payload to be sent to new validators + /// + /// The implementation should ensure the full payload data is made available + /// to validators who need to verify it. + fn repropose( + &mut self, + payload: Self::Digest, + ) -> impl Future + Send; + } + /// Application is a minimal interface for standard implementations that operate over a stream /// of epoched blocks. pub trait Application: Clone + Send + 'static @@ -175,22 +197,6 @@ cfg_if::cfg_if! { ) -> impl Future + Send; } - /// Relay is the interface responsible for broadcasting payloads to the network. - /// - /// The consensus engine is only aware of a payload's digest, not its contents. It is up - /// to the relay to efficiently broadcast the full payload to other participants. - pub trait Relay: Clone + Send + 'static { - /// Hash of an arbitrary payload. - type Digest: Digest; - - /// Called once consensus begins working towards a proposal provided by `Automaton` (i.e. - /// it isn't dropped). - /// - /// Other participants may not begin voting on a proposal until they have the full contents, - /// so timely delivery often yields better performance. - fn broadcast(&mut self, payload: Self::Digest) -> impl Future + Send; - } - /// Reporter is the interface responsible for reporting activity to some external actor. pub trait Reporter: Clone + Send + 'static { /// Activity is specified by the underlying consensus implementation and can be interpreted if desired. diff --git a/consensus/src/ordered_broadcast/config.rs b/consensus/src/ordered_broadcast/config.rs index 042e7244a5..59843c1d1c 100644 --- a/consensus/src/ordered_broadcast/config.rs +++ b/consensus/src/ordered_broadcast/config.rs @@ -1,7 +1,7 @@ use super::types::{Activity, Context, SequencersProvider}; use crate::{ types::{Epoch, EpochDelta}, - Automaton, Monitor, Relay, Reporter, + Monitor, Reporter, RetryableAutomaton, }; use commonware_cryptography::{certificate::Provider, Digest, Signer}; use commonware_runtime::buffer::PoolRef; @@ -13,8 +13,7 @@ pub struct Config< S: SequencersProvider, P: Provider, D: Digest, - A: Automaton, Digest = D>, - R: Relay, + A: RetryableAutomaton, Digest = D>, Z: Reporter>, M: Monitor, > { @@ -28,11 +27,16 @@ pub struct Config< pub validators_provider: P, /// Proposes and verifies digests. + /// + /// The automaton is responsible for broadcasting the full payload data to other + /// participants during `propose()`. The engine only broadcasts signed digest + /// references (Node messages), not the payloads themselves. Validators need + /// access to the full payload to verify it. + /// + /// The engine will call `repropose()` when it needs to rebroadcast a payload + /// (e.g., after an epoch change when new validators need the payload). pub automaton: A, - /// Broadcasts the raw payload. - pub relay: R, - /// Notified when a chunk receives a quorum of acks. pub reporter: Z, @@ -51,6 +55,9 @@ pub struct Config< pub priority_acks: bool, /// How often a proposal is rebroadcast to all validators if no quorum is reached. + /// + /// This controls rebroadcast of the signed digest reference (Node message), not + /// the full payload. Payload rebroadcast is the application's responsibility. pub rebroadcast_timeout: Duration, /// A tuple representing the epochs to keep in memory. diff --git a/consensus/src/ordered_broadcast/engine.rs b/consensus/src/ordered_broadcast/engine.rs index 84ccddeb94..b9810719c7 100644 --- a/consensus/src/ordered_broadcast/engine.rs +++ b/consensus/src/ordered_broadcast/engine.rs @@ -16,7 +16,7 @@ use super::{ }; use crate::{ types::{Epoch, EpochDelta}, - Automaton, Monitor, Relay, Reporter, + Monitor, Reporter, RetryableAutomaton, }; use commonware_codec::Encode; use commonware_cryptography::{ @@ -67,8 +67,7 @@ pub struct Engine< S: SequencersProvider, P: Provider>, D: Digest, - A: Automaton, Digest = D> + Clone, - R: Relay, + A: RetryableAutomaton, Digest = D> + Clone, Z: Reporter>, M: Monitor, > { @@ -80,7 +79,6 @@ pub struct Engine< sequencers_provider: S, validators_provider: P, automaton: A, - relay: R, monitor: M, reporter: Z, @@ -203,14 +201,13 @@ impl< S: SequencersProvider, P: Provider>, D: Digest, - A: Automaton, Digest = D> + Clone, - R: Relay, + A: RetryableAutomaton, Digest = D> + Clone, Z: Reporter>, M: Monitor, - > Engine + > Engine { /// Creates a new engine with the given context and configuration. - pub fn new(context: E, cfg: Config) -> Self { + pub fn new(context: E, cfg: Config) -> Self { // TODO(#1833): Metrics should use the post-start context let metrics = metrics::Metrics::init(context.clone()); @@ -220,7 +217,6 @@ impl< sequencers_provider: cfg.sequencers_provider, validators_provider: cfg.validators_provider, automaton: cfg.automaton, - relay: cfg.relay, reporter: cfg.reporter, monitor: cfg.monitor, namespace: cfg.namespace, @@ -815,7 +811,10 @@ impl< return Err(Error::AlreadyCertified); } - // Broadcast the message, which resets the rebroadcast deadline + // Ask the application to re-broadcast the payload (e.g., to new validators after epoch change) + self.automaton.repropose(tip.chunk.payload).await; + + // Broadcast the Node message, which resets the rebroadcast deadline guard.set(Status::Failure); self.broadcast(tip, node_sender, self.epoch).await?; guard.set(Status::Success); @@ -835,9 +834,6 @@ impl< }; let validators = scheme.participants(); - // Tell the relay to broadcast the full data - self.relay.broadcast(node.chunk.payload).await; - // Send the node to all validators node_sender .send( diff --git a/consensus/src/ordered_broadcast/mocks/automaton.rs b/consensus/src/ordered_broadcast/mocks/automaton.rs index 853f9dbf9b..b99bce222b 100644 --- a/consensus/src/ordered_broadcast/mocks/automaton.rs +++ b/consensus/src/ordered_broadcast/mocks/automaton.rs @@ -1,4 +1,6 @@ -use crate::{ordered_broadcast::types::Context, types::Epoch, Automaton as A, Relay as R}; +use crate::{ + ordered_broadcast::types::Context, types::Epoch, Automaton as A, RetryableAutomaton as RA, +}; use bytes::Bytes; use commonware_cryptography::{sha256, Hasher, PublicKey, Sha256}; use futures::channel::oneshot; @@ -59,9 +61,8 @@ impl A for Automaton

{ } } -impl R for Automaton

{ - type Digest = sha256::Digest; - async fn broadcast(&mut self, payload: Self::Digest) { - trace!(?payload, "broadcast"); +impl RA for Automaton

{ + async fn repropose(&mut self, payload: Self::Digest) { + trace!(?payload, "repropose"); } } diff --git a/consensus/src/ordered_broadcast/mocks/drop_first_automaton.rs b/consensus/src/ordered_broadcast/mocks/drop_first_automaton.rs new file mode 100644 index 0000000000..303d21696d --- /dev/null +++ b/consensus/src/ordered_broadcast/mocks/drop_first_automaton.rs @@ -0,0 +1,82 @@ +use crate::{ + ordered_broadcast::types::Context, types::Epoch, Automaton as A, RetryableAutomaton as RA, +}; +use bytes::Bytes; +use commonware_cryptography::{sha256, Hasher, PublicKey, Sha256}; +use futures::channel::oneshot; +use std::{ + collections::HashSet, + sync::{Arc, Mutex}, +}; +use tracing::trace; + +/// A mock automaton that simulates first broadcast failures. +/// +/// This automaton "drops" the first propose for each digest, only making the +/// payload available for verification after `repropose()` is called. This tests +/// that the engine correctly triggers repropose when initial broadcasts fail. +#[derive(Clone)] +pub struct DropFirstAutomaton { + delivered: Arc>>, + _phantom: std::marker::PhantomData

, +} + +impl DropFirstAutomaton

{ + pub fn new() -> Self { + Self { + delivered: Arc::new(Mutex::new(HashSet::new())), + _phantom: std::marker::PhantomData, + } + } +} + +impl Default for DropFirstAutomaton

{ + fn default() -> Self { + Self::new() + } +} + +impl A for DropFirstAutomaton

{ + type Context = Context

; + type Digest = sha256::Digest; + + async fn genesis(&mut self, _epoch: Epoch) -> Self::Digest { + unimplemented!() + } + + async fn propose(&mut self, context: Self::Context) -> oneshot::Receiver { + let (sender, receiver) = oneshot::channel(); + + let Self::Context { sequencer, height } = context; + let payload = Bytes::from(format!("hello world, {sequencer} {height}")); + let mut hasher = Sha256::default(); + hasher.update(&payload); + let digest = hasher.finalize(); + + trace!(?digest, "propose (dropping first broadcast)"); + + sender.send(digest).unwrap(); + receiver + } + + async fn verify( + &mut self, + context: Self::Context, + payload: Self::Digest, + ) -> oneshot::Receiver { + let (sender, receiver) = oneshot::channel(); + + let delivered = self.delivered.lock().unwrap().contains(&payload); + trace!(?context, ?payload, delivered, "verify"); + + sender.send(delivered).unwrap(); + receiver + } +} + +impl RA for DropFirstAutomaton

{ + async fn repropose(&mut self, payload: Self::Digest) { + trace!(?payload, "repropose (delivering payload)"); + self.delivered.lock().unwrap().insert(payload); + } +} diff --git a/consensus/src/ordered_broadcast/mocks/mod.rs b/consensus/src/ordered_broadcast/mocks/mod.rs index 95de16330a..cb3526346e 100644 --- a/consensus/src/ordered_broadcast/mocks/mod.rs +++ b/consensus/src/ordered_broadcast/mocks/mod.rs @@ -2,6 +2,8 @@ mod automaton; pub use automaton::Automaton; +mod drop_first_automaton; +pub use drop_first_automaton::DropFirstAutomaton; mod monitor; pub use monitor::Monitor; mod reporter; diff --git a/consensus/src/ordered_broadcast/mod.rs b/consensus/src/ordered_broadcast/mod.rs index c3c9951a37..240f5008b9 100644 --- a/consensus/src/ordered_broadcast/mod.rs +++ b/consensus/src/ordered_broadcast/mod.rs @@ -230,7 +230,6 @@ mod tests { sequencers_provider: sequencers, validators_provider, automaton: automaton.clone(), - relay: automaton.clone(), reporter: reporters.get(validator).unwrap().clone(), monitor, namespace: namespace.to_vec(), @@ -735,7 +734,6 @@ mod tests { sequencer_signer: Some(fixture.private_keys[idx].clone()), sequencers_provider: sequencers, validators_provider, - relay: automaton.clone(), automaton: automaton.clone(), reporter: reporters.get(validator).unwrap().clone(), monitor, @@ -886,7 +884,6 @@ mod tests { sequencer_signer: None::, // Validators don't propose in this test sequencers_provider: sequencers, validators_provider, - relay: automaton.clone(), automaton: automaton.clone(), reporter: reporters.get(validator).unwrap().clone(), monitor, @@ -935,7 +932,6 @@ mod tests { sequencer.public_key() ]), validators_provider, - relay: automaton.clone(), automaton, reporter: reporters.get(&sequencer.public_key()).unwrap().clone(), monitor: mocks::Monitor::new(epoch), @@ -1067,4 +1063,112 @@ mod tests { fn test_1k_ed25519() { run_1k(ed25519::fixture); } + + fn requires_repropose(fixture: F) + where + S: Scheme, + F: FnOnce(&mut deterministic::Context, u32) -> Fixture, + { + let runner = deterministic::Runner::timed(Duration::from_secs(120)); + + runner.start(|mut context| async move { + let epoch = Epoch::new(111); + let num_validators = 4; + let fixture = fixture(&mut context, num_validators); + + let (network, mut oracle) = Network::new( + context.with_label("network"), + commonware_p2p::simulated::Config { + max_size: 1024 * 1024, + disconnect_on_block: true, + tracked_peer_sets: None, + }, + ); + network.start(); + + let mut registrations = register_participants(&mut oracle, &fixture.participants).await; + link_participants( + &mut oracle, + &fixture.participants, + Action::Link(RELIABLE_LINK), + None, + ) + .await; + + let mut reporters = BTreeMap::new(); + let namespace = b"my testing namespace"; + + // Create a shared automaton - all validators share the same delivered state + // so when one sequencer's repropose succeeds, all validators can verify + let shared_automaton = mocks::DropFirstAutomaton::::new(); + + for (idx, validator) in fixture.participants.iter().enumerate() { + let context = context.with_label(&format!("validator_{validator}")); + let monitor = mocks::Monitor::new(epoch); + let sequencers = mocks::Sequencers::::new(fixture.participants.clone()); + + let validators_provider = mocks::Provider::new(); + assert!(validators_provider.register(epoch, fixture.schemes[idx].clone())); + + // Use DropFirstAutomaton which requires repropose to succeed + let automaton = shared_automaton.clone(); + // Allow unlimited misses since every first proposal fails until repropose + let (reporter, reporter_mailbox) = mocks::Reporter::new( + context.clone(), + namespace, + fixture.verifier.clone(), + None, + ); + context.with_label("reporter").spawn(|_| reporter.run()); + reporters.insert(validator.clone(), reporter_mailbox); + + let engine = Engine::new( + context.with_label("engine"), + Config { + sequencer_signer: Some(fixture.private_keys[idx].clone()), + sequencers_provider: sequencers, + validators_provider, + automaton, + reporter: reporters.get(validator).unwrap().clone(), + monitor, + namespace: namespace.to_vec(), + priority_proposals: false, + priority_acks: false, + // Short rebroadcast timeout to trigger repropose quickly + rebroadcast_timeout: Duration::from_millis(100), + epoch_bounds: (EpochDelta::new(1), EpochDelta::new(1)), + height_bound: 2, + journal_heights_per_section: 10, + journal_replay_buffer: NZUsize!(4096), + journal_write_buffer: NZUsize!(4096), + journal_name_prefix: format!("ordered-broadcast-seq-{validator}-"), + journal_compression: Some(3), + journal_buffer_pool: PoolRef::new(PAGE_SIZE, PAGE_CACHE_SIZE), + }, + ); + + let ((a1, a2), (b1, b2)) = registrations.remove(validator).unwrap(); + engine.start((a1, a2), (b1, b2)); + } + + // Test passes if validators can make progress despite first broadcast + // being ignored - this requires repropose to work correctly. + await_reporters( + context.with_label("reporter"), + reporters.keys().cloned().collect::>(), + &reporters, + (50, epoch, true), + ) + .await; + }); + } + + #[test_traced] + fn test_requires_repropose() { + requires_repropose(bls12381_threshold::fixture::); + requires_repropose(bls12381_threshold::fixture::); + requires_repropose(bls12381_multisig::fixture::); + requires_repropose(bls12381_multisig::fixture::); + requires_repropose(ed25519::fixture); + } } diff --git a/consensus/src/simplex/actors/voter/actor.rs b/consensus/src/simplex/actors/voter/actor.rs index 65bf8dc56f..dcd9e76dd9 100644 --- a/consensus/src/simplex/actors/voter/actor.rs +++ b/consensus/src/simplex/actors/voter/actor.rs @@ -16,7 +16,7 @@ use crate::{ }, }, types::{Round as Rnd, View}, - CertifiableAutomaton, Relay, Reporter, Viewable, LATENCY, + CertifiableAutomaton, Reporter, Viewable, LATENCY, }; use commonware_codec::Read; use commonware_cryptography::Digest; @@ -98,14 +98,12 @@ pub struct Actor< B: Blocker, D: Digest, A: CertifiableAutomaton>, - R: Relay, F: Reporter>, > { context: ContextCell, state: State, blocker: B, automaton: A, - relay: R, reporter: F, certificate_config: ::Cfg, @@ -130,11 +128,10 @@ impl< B: Blocker, D: Digest, A: CertifiableAutomaton>, - R: Relay, F: Reporter>, - > Actor + > Actor { - pub fn new(context: E, cfg: Config) -> (Self, Mailbox) { + pub fn new(context: E, cfg: Config) -> (Self, Mailbox) { // Assert correctness of timeouts if cfg.leader_timeout > cfg.notarization_timeout { panic!("leader timeout must be less than or equal to notarization timeout"); @@ -183,7 +180,6 @@ impl< state, blocker: cfg.blocker, automaton: cfg.automaton, - relay: cfg.relay, reporter: cfg.reporter, certificate_config, @@ -925,9 +921,6 @@ impl< continue; } view = self.state.current_view(); - - // Notify application of proposal - self.relay.broadcast(proposed).await; }, (context, verified) = verify_wait => { // Clear verify waiter diff --git a/consensus/src/simplex/actors/voter/mod.rs b/consensus/src/simplex/actors/voter/mod.rs index bb43535e46..dbf0de8209 100644 --- a/consensus/src/simplex/actors/voter/mod.rs +++ b/consensus/src/simplex/actors/voter/mod.rs @@ -7,7 +7,7 @@ mod state; use crate::{ simplex::{elector::Config as Elector, types::Activity}, types::{Epoch, ViewDelta}, - CertifiableAutomaton, Relay, Reporter, + CertifiableAutomaton, Reporter, }; pub use actor::Actor; use commonware_cryptography::{certificate::Scheme, Digest}; @@ -24,14 +24,12 @@ pub struct Config< B: Blocker, D: Digest, A: CertifiableAutomaton, - R: Relay, F: Reporter>, > { pub scheme: S, pub elector: L, pub blocker: B, pub automaton: A, - pub relay: R, pub reporter: F, pub partition: String, @@ -188,7 +186,6 @@ mod tests { elector, blocker: oracle.control(me.clone()), automaton: application.clone(), - relay: application.clone(), reporter: reporter.clone(), partition: "test".to_string(), epoch: Epoch::new(333), @@ -425,7 +422,6 @@ mod tests { elector, blocker: oracle.control(me.clone()), automaton: application.clone(), - relay: application.clone(), reporter: reporter.clone(), partition: format!("voter_actor_test_{me}"), epoch: Epoch::new(333), @@ -705,7 +701,6 @@ mod tests { elector, blocker: oracle.control(participants[0].clone()), automaton: application.clone(), - relay: application.clone(), reporter: reporter.clone(), partition: "voter_finalization_test".to_string(), epoch: Epoch::new(333), @@ -889,7 +884,6 @@ mod tests { elector, blocker: oracle.control(participants[0].clone()), automaton: application.clone(), - relay: application.clone(), reporter: reporter.clone(), partition: "voter_certificate_conflicts_proposal_test".to_string(), epoch: Epoch::new(333), @@ -1082,7 +1076,6 @@ mod tests { elector, blocker: oracle.control(participants[0].clone()), automaton: application.clone(), - relay: application.clone(), reporter: reporter.clone(), partition: "voter_proposal_conflicts_certificate_test".to_string(), epoch: Epoch::new(333), @@ -1262,7 +1255,6 @@ mod tests { elector, blocker: oracle.control(participants[0].clone()), automaton: application.clone(), - relay: application.clone(), reporter: reporter.clone(), partition: "voter_certificate_verifies_proposal_test".to_string(), epoch: Epoch::new(333), @@ -1458,7 +1450,6 @@ mod tests { elector: elector_config, blocker: oracle.control(leader.clone()), automaton: application.clone(), - relay: application.clone(), reporter: reporter.clone(), partition: "voter_leader".to_string(), epoch, @@ -1665,7 +1656,6 @@ mod tests { elector: elector.clone(), blocker: oracle.control(participants[0].clone()), automaton: application.clone(), - relay: application.clone(), reporter: reporter.clone(), partition: "voter_populate_resolver_on_restart_test".to_string(), epoch: Epoch::new(333), @@ -1757,7 +1747,6 @@ mod tests { elector: elector.clone(), blocker: oracle.control(participants[0].clone()), automaton: application.clone(), - relay: application.clone(), reporter: reporter.clone(), partition: "voter_populate_resolver_on_restart_test".to_string(), epoch: Epoch::new(333), @@ -1897,7 +1886,6 @@ mod tests { elector, blocker: oracle.control(participants[0].clone()), automaton: application.clone(), - relay: application.clone(), reporter: reporter.clone(), partition: "finalization_from_resolver".to_string(), epoch: Epoch::new(333), @@ -2060,7 +2048,6 @@ mod tests { elector, blocker: oracle.control(participants[0].clone()), automaton: application.clone(), - relay: application.clone(), reporter: reporter.clone(), partition: "no_resolver_boomerang".to_string(), epoch: Epoch::new(333), @@ -2240,7 +2227,6 @@ mod tests { elector, blocker: oracle.control(me.clone()), automaton: application.clone(), - relay: application.clone(), reporter: reporter.clone(), partition: format!("voter_verify_fail_test_{me}"), epoch: Epoch::new(333), @@ -2466,7 +2452,6 @@ mod tests { elector: elector.clone(), blocker: oracle.control(me.clone()), automaton: application.clone(), - relay: application.clone(), reporter: reporter.clone(), partition: "no_recertification_after_replay".to_string(), epoch: Epoch::new(333), @@ -2600,7 +2585,6 @@ mod tests { elector: elector.clone(), blocker: oracle.control(me.clone()), automaton: application.clone(), - relay: application.clone(), reporter: reporter.clone(), partition: "no_recertification_after_replay".to_string(), epoch: Epoch::new(333), diff --git a/consensus/src/simplex/config.rs b/consensus/src/simplex/config.rs index 7e5bf50d8a..8758fa8d11 100644 --- a/consensus/src/simplex/config.rs +++ b/consensus/src/simplex/config.rs @@ -4,7 +4,7 @@ use super::{ }; use crate::{ types::{Epoch, ViewDelta}, - CertifiableAutomaton, Relay, Reporter, + CertifiableAutomaton, Reporter, }; use commonware_cryptography::{certificate::Scheme, Digest}; use commonware_p2p::Blocker; @@ -18,7 +18,6 @@ pub struct Config< B: Blocker, D: Digest, A: CertifiableAutomaton>, - R: Relay, F: Reporter>, > { /// Signing scheme for the consensus engine. @@ -48,9 +47,6 @@ pub struct Config< /// Automaton for the consensus engine. pub automaton: A, - /// Relay for the consensus engine. - pub relay: R, - /// Reporter for the consensus engine. /// /// All activity is exported for downstream applications that benefit from total observability, @@ -116,9 +112,8 @@ impl< B: Blocker, D: Digest, A: CertifiableAutomaton>, - R: Relay, F: Reporter>, - > Config + > Config { /// Assert enforces that all configuration values are valid. pub fn assert(&self) { diff --git a/consensus/src/simplex/engine.rs b/consensus/src/simplex/engine.rs index cc7c91897d..bdc120fcc0 100644 --- a/consensus/src/simplex/engine.rs +++ b/consensus/src/simplex/engine.rs @@ -4,7 +4,7 @@ use super::{ elector::Config as Elector, types::{Activity, Context}, }; -use crate::{simplex::scheme::Scheme, CertifiableAutomaton, Relay, Reporter}; +use crate::{simplex::scheme::Scheme, CertifiableAutomaton, Reporter}; use commonware_cryptography::Digest; use commonware_macros::select; use commonware_p2p::{Blocker, Receiver, Sender}; @@ -20,12 +20,11 @@ pub struct Engine< B: Blocker, D: Digest, A: CertifiableAutomaton, Digest = D>, - R: Relay, F: Reporter>, > { context: ContextCell, - voter: voter::Actor, + voter: voter::Actor, voter_mailbox: voter::Mailbox, batcher: batcher::Actor, @@ -42,12 +41,11 @@ impl< B: Blocker, D: Digest, A: CertifiableAutomaton, Digest = D>, - R: Relay, F: Reporter>, - > Engine + > Engine { /// Create a new `simplex` consensus engine. - pub fn new(context: E, cfg: Config) -> Self { + pub fn new(context: E, cfg: Config) -> Self { // Ensure configuration is valid cfg.assert(); @@ -74,7 +72,6 @@ impl< elector: cfg.elector, blocker: cfg.blocker.clone(), automaton: cfg.automaton, - relay: cfg.relay, reporter: cfg.reporter, partition: cfg.partition, mailbox_size: cfg.mailbox_size, diff --git a/consensus/src/simplex/mocks/application.rs b/consensus/src/simplex/mocks/application.rs index 6698b3e499..7093a34955 100644 --- a/consensus/src/simplex/mocks/application.rs +++ b/consensus/src/simplex/mocks/application.rs @@ -5,7 +5,7 @@ use super::relay::Relay; use crate::{ simplex::types::Context, types::{Epoch, Round}, - Automaton as Au, CertifiableAutomaton as CAu, Relay as Re, + Automaton as Au, CertifiableAutomaton as CAu, }; use bytes::Bytes; use commonware_codec::{DecodeExt, Encode}; @@ -43,9 +43,6 @@ pub enum Message { payload: D, response: oneshot::Sender, }, - Broadcast { - payload: D, - }, } #[derive(Clone)] @@ -113,17 +110,6 @@ impl CAu for Mailbox { } } -impl Re for Mailbox { - type Digest = D; - - async fn broadcast(&mut self, payload: Self::Digest) { - self.sender - .send(Message::Broadcast { payload }) - .await - .expect("Failed to send broadcast"); - } -} - const GENESIS_BYTES: &[u8] = b"genesis"; type Latency = (f64, f64); @@ -177,8 +163,6 @@ pub struct Application { fail_verification: bool, should_certify: Certifier, - pending: HashMap, - verified: HashSet, } @@ -212,7 +196,6 @@ impl Application fail_verification: false, should_certify: cfg.should_certify, - pending: HashMap::new(), verified: HashSet::new(), }, Mailbox::new(sender), @@ -253,8 +236,10 @@ impl Application // Mark verified self.verified.insert(digest); - // Store pending payload - self.pending.insert(digest, payload.into()); + // Broadcast payload to other participants + self.relay + .broadcast(&self.me, (digest, payload.into())) + .await; digest } @@ -310,11 +295,6 @@ impl Application } } - async fn broadcast(&mut self, payload: H::Digest) { - let contents = self.pending.remove(&payload).expect("missing payload"); - self.relay.broadcast(&self.me, (payload, contents)).await; - } - pub fn start(mut self) -> Handle<()> { spawn_cell!(self.context, self.run().await) } @@ -364,9 +344,6 @@ impl Application let certified = self.certify(payload, contents).await; let _ = response.send(certified); } - Message::Broadcast { payload } => { - self.broadcast(payload).await; - } } }, broadcast = self.broadcast.next() => { diff --git a/consensus/src/simplex/mod.rs b/consensus/src/simplex/mod.rs index 1c3f06e0d1..c9617e408d 100644 --- a/consensus/src/simplex/mod.rs +++ b/consensus/src/simplex/mod.rs @@ -521,7 +521,6 @@ mod tests { elector: elector.clone(), blocker, automaton: application.clone(), - relay: application.clone(), reporter: reporter.clone(), partition: validator.to_string(), mailbox_size: 1024, @@ -789,7 +788,6 @@ mod tests { elector: elector.clone(), blocker, automaton: application.clone(), - relay: application.clone(), reporter: reporter.clone(), partition: validator.to_string(), mailbox_size: 1024, @@ -957,7 +955,6 @@ mod tests { elector: elector.clone(), blocker, automaton: application.clone(), - relay: application.clone(), reporter: reporter.clone(), partition: validator.to_string(), mailbox_size: 1024, @@ -1147,7 +1144,6 @@ mod tests { elector: elector.clone(), blocker, automaton: application.clone(), - relay: application.clone(), reporter: reporter.clone(), partition: validator.to_string(), mailbox_size: 1024, @@ -1268,7 +1264,6 @@ mod tests { elector: elector.clone(), blocker, automaton: application.clone(), - relay: application.clone(), reporter: reporter.clone(), partition: me.to_string(), mailbox_size: 1024, @@ -1409,7 +1404,6 @@ mod tests { elector: elector.clone(), blocker, automaton: application.clone(), - relay: application.clone(), reporter: reporter.clone(), partition: validator.to_string(), mailbox_size: 1024, @@ -1671,7 +1665,6 @@ mod tests { elector: elector.clone(), blocker, automaton: application.clone(), - relay: application.clone(), reporter: reporter.clone(), partition: validator.to_string(), mailbox_size: 1024, @@ -1848,7 +1841,6 @@ mod tests { elector: elector.clone(), blocker, automaton: application.clone(), - relay: application.clone(), reporter: reporter.clone(), partition: validator.to_string(), mailbox_size: 1024, @@ -2062,7 +2054,6 @@ mod tests { elector: elector.clone(), blocker, automaton: application.clone(), - relay: application.clone(), reporter: reporter.clone(), partition: validator.to_string(), mailbox_size: 1024, @@ -2273,7 +2264,6 @@ mod tests { elector: elector.clone(), blocker, automaton: application.clone(), - relay: application.clone(), reporter: reporter.clone(), partition: validator.to_string(), mailbox_size: 1024, @@ -2508,7 +2498,6 @@ mod tests { elector: elector.clone(), blocker, automaton: application.clone(), - relay: application.clone(), reporter: reporter.clone(), partition: validator.to_string(), mailbox_size: 1024, @@ -2687,7 +2676,6 @@ mod tests { elector: elector.clone(), blocker, automaton: application.clone(), - relay: application.clone(), reporter: reporter.clone(), partition: validator.clone().to_string(), mailbox_size: 1024, @@ -2867,7 +2855,6 @@ mod tests { elector: elector.clone(), blocker, automaton: application.clone(), - relay: application.clone(), reporter: reporter.clone(), partition: validator.clone().to_string(), mailbox_size: 1024, @@ -3043,7 +3030,6 @@ mod tests { elector: elector.clone(), blocker, automaton: application.clone(), - relay: application.clone(), reporter: reporter.clone(), partition: validator.to_string(), mailbox_size: 1024, @@ -3134,7 +3120,6 @@ mod tests { elector: elector.clone(), blocker, automaton: application.clone(), - relay: application.clone(), reporter: reporter.clone(), partition: validator.to_string(), mailbox_size: 1024, @@ -3335,7 +3320,6 @@ mod tests { elector: elector.clone(), blocker, automaton: application.clone(), - relay: application.clone(), reporter: reporter.clone(), partition: validator.to_string(), mailbox_size: 1024, @@ -3503,7 +3487,6 @@ mod tests { elector: elector.clone(), blocker, automaton: application.clone(), - relay: application.clone(), reporter: reporter.clone(), partition: validator.clone().to_string(), mailbox_size: 1024, @@ -3685,7 +3668,6 @@ mod tests { elector: elector.clone(), blocker, automaton: application.clone(), - relay: application.clone(), reporter: reporter.clone(), partition: validator.clone().to_string(), mailbox_size: 1024, @@ -3835,7 +3817,6 @@ mod tests { elector: elector.clone(), blocker, automaton: application.clone(), - relay: application.clone(), reporter: reporter.clone(), partition: validator.to_string(), mailbox_size: 1024, @@ -3995,7 +3976,6 @@ mod tests { elector: elector.clone(), blocker, automaton: application.clone(), - relay: application.clone(), reporter: reporter.clone(), partition: participants[0].clone().to_string(), mailbox_size: 64, @@ -4181,7 +4161,6 @@ mod tests { elector: elector.clone(), blocker, automaton: application.clone(), - relay: application.clone(), reporter: attributable_reporter, partition: validator.to_string(), mailbox_size: 1024, @@ -4418,7 +4397,6 @@ mod tests { elector: elector.clone(), blocker, automaton: application.clone(), - relay: application.clone(), reporter: reporter.clone(), partition: validator.to_string(), mailbox_size: 1024, @@ -4754,7 +4732,6 @@ mod tests { elector: elector.clone(), blocker, automaton: application.clone(), - relay: application.clone(), reporter: reporter.clone(), partition: validator.to_string(), mailbox_size: 1024, @@ -4905,7 +4882,6 @@ mod tests { elector: elector.clone(), blocker, automaton: application.clone(), - relay: application.clone(), reporter: reporter.clone(), partition: validator.to_string(), mailbox_size: 1024, @@ -5001,7 +4977,6 @@ mod tests { elector: elector.clone(), blocker, automaton: application.clone(), - relay: application.clone(), reporter: selected_reporter, partition: validator.to_string(), mailbox_size: 1024, @@ -5399,7 +5374,6 @@ mod tests { elector: elector.clone(), blocker, automaton: application.clone(), - relay: application.clone(), reporter: reporter.clone(), partition: label, mailbox_size: 1024, @@ -5457,7 +5431,6 @@ mod tests { elector: elector.clone(), blocker, automaton: application.clone(), - relay: application.clone(), reporter: reporter.clone(), partition: label, mailbox_size: 1024, diff --git a/examples/bridge/src/application/ingress.rs b/examples/bridge/src/application/ingress.rs index 2cbc0f6fe5..ba5c18d1fb 100644 --- a/examples/bridge/src/application/ingress.rs +++ b/examples/bridge/src/application/ingress.rs @@ -2,7 +2,7 @@ use crate::Scheme; use commonware_consensus::{ simplex::types::{Activity, Context}, types::{Epoch, Round}, - Automaton as Au, CertifiableAutomaton as CAu, Relay as Re, Reporter, + Automaton as Au, CertifiableAutomaton as CAu, Reporter, }; use commonware_cryptography::{ed25519::PublicKey, Digest}; use futures::{ @@ -91,17 +91,6 @@ impl CAu for Mailbox { // Uses default certify implementation which always returns true } -impl Re for Mailbox { - type Digest = D; - - async fn broadcast(&mut self, _: Self::Digest) { - // We don't broadcast our raw messages to other peers. - // - // If we were building an EVM blockchain, for example, we'd - // send the block to other peers here. - } -} - impl Reporter for Mailbox { type Activity = Activity; diff --git a/examples/bridge/src/bin/validator.rs b/examples/bridge/src/bin/validator.rs index b6eac326af..b76745fa22 100644 --- a/examples/bridge/src/bin/validator.rs +++ b/examples/bridge/src/bin/validator.rs @@ -240,7 +240,6 @@ fn main() { elector: Random, blocker: oracle, automaton: mailbox.clone(), - relay: mailbox.clone(), reporter: mailbox.clone(), partition: String::from("log"), mailbox_size: 1024, diff --git a/examples/log/src/application/ingress.rs b/examples/log/src/application/ingress.rs index 5fed8b50eb..8468a22d67 100644 --- a/examples/log/src/application/ingress.rs +++ b/examples/log/src/application/ingress.rs @@ -1,6 +1,5 @@ use commonware_consensus::{ simplex::types::Context, types::Epoch, Automaton as Au, CertifiableAutomaton as CAu, - Relay as Re, }; use commonware_cryptography::{ed25519::PublicKey, Digest}; use futures::{ @@ -81,14 +80,3 @@ impl Au for Mailbox { impl CAu for Mailbox { // Uses default certify implementation which always returns true } - -impl Re for Mailbox { - type Digest = D; - - async fn broadcast(&mut self, _: Self::Digest) { - // We don't broadcast our raw messages to other peers. - // - // If we were building an EVM blockchain, for example, we'd - // send the block to other peers here. - } -} diff --git a/examples/log/src/main.rs b/examples/log/src/main.rs index 7ed7f3b9c6..bd5bc8d739 100644 --- a/examples/log/src/main.rs +++ b/examples/log/src/main.rs @@ -208,7 +208,6 @@ fn main() { elector: RoundRobin::::default(), blocker: oracle, automaton: mailbox.clone(), - relay: mailbox.clone(), reporter: reporter.clone(), namespace, partition: String::from("log"), diff --git a/examples/reshare/src/orchestrator/actor.rs b/examples/reshare/src/orchestrator/actor.rs index 8f96747bec..0a1b9c8924 100644 --- a/examples/reshare/src/orchestrator/actor.rs +++ b/examples/reshare/src/orchestrator/actor.rs @@ -9,7 +9,7 @@ use commonware_consensus::{ marshal, simplex::{self, elector::Config as Elector, scheme, types::Context}, types::{Epoch, Epocher, FixedEpocher, ViewDelta}, - CertifiableAutomaton, Relay, + CertifiableAutomaton, }; use commonware_cryptography::{ bls12381::primitives::variant::Variant, certificate::Scheme, Hasher, Signer, @@ -35,8 +35,7 @@ where V: Variant, C: Signer, H: Hasher, - A: CertifiableAutomaton, Digest = H::Digest> - + Relay, + A: CertifiableAutomaton, Digest = H::Digest>, S: Scheme, L: Elector, { @@ -62,8 +61,7 @@ where V: Variant, C: Signer, H: Hasher, - A: CertifiableAutomaton, Digest = H::Digest> - + Relay, + A: CertifiableAutomaton, Digest = H::Digest>, S: Scheme, L: Elector, Provider: EpochProvider, @@ -90,8 +88,7 @@ where V: Variant, C: Signer, H: Hasher, - A: CertifiableAutomaton, Digest = H::Digest> - + Relay, + A: CertifiableAutomaton, Digest = H::Digest>, S: scheme::Scheme, L: Elector, Provider: EpochProvider, @@ -297,7 +294,6 @@ where elector, blocker: self.oracle.clone(), automaton: self.application.clone(), - relay: self.application.clone(), reporter: self.marshal.clone(), partition: format!("{}_consensus_{}", self.partition_prefix, epoch), mailbox_size: 1024,