diff --git a/application/src/actor.rs b/application/src/actor.rs index 8872442..1cd5bd3 100644 --- a/application/src/actor.rs +++ b/application/src/actor.rs @@ -148,7 +148,7 @@ impl< parent, mut response, } => { - debug!("{rand_id} application: Handling message Propose for round {} (epoch {}, view {}), parent height: {}", + debug!("{rand_id} application: Handling message Propose for round {} (epoch {}, view {}), parent view: {}", round, round.epoch(), round.view(), parent.0); let built = self.built_block.clone(); @@ -156,7 +156,7 @@ impl< let proposal_start = std::time::Instant::now(); select! { - res = self.handle_proposal((parent.0.get(), parent.1), &mut syncer, &mut finalizer, round) => { + res = self.handle_proposal(parent, &mut syncer, &mut finalizer, round) => { match res { Ok(block) => { // store block @@ -179,7 +179,7 @@ impl< let elapsed = proposal_start.elapsed(); warn!( round = ?round, - parent_height = parent.0.get(), + parent_view = parent.0.get(), parent_digest = ?parent.1, elapsed_ms = elapsed.as_millis(), "proposal aborted - consensus timed out waiting for block (possible notarize-nullify race)" @@ -191,7 +191,7 @@ impl< #[cfg(not(feature = "prom"))] warn!( round = ?round, - parent_height = parent.0.get(), + parent_view = parent.0.get(), parent_digest = ?parent.1, "proposal aborted - consensus timed out waiting for block (possible notarize-nullify race)" ); @@ -222,7 +222,7 @@ impl< payload, mut response, } => { - debug!("{rand_id} application: Handling message Verify for round {} (epoch {}, view {}), parent height: {}", + debug!("{rand_id} application: Handling message Verify for round {} (epoch {}, view {}), parent view: {}", round, round.epoch(), round.view(), parent.0); // Subscribe to blocks (will wait for them if not available) @@ -258,8 +258,9 @@ impl< // Request the current epoch #[cfg(feature = "prom")] let aux_data_start = std::time::Instant::now(); + let parent_digest = parent.digest(); let aux_data = finalizer_clone - .get_aux_data(parent.height() + 1) + .get_aux_data(parent.height() + 1, parent_digest) .await .await .expect("Finalizer dropped"); @@ -303,7 +304,7 @@ impl< async fn handle_proposal( &mut self, - parent: (u64, Digest), + parent: (View, Digest), syncer: &mut SyncerMailbox>, finalizer: &mut FinalizerMailbox>, round: Round, @@ -314,15 +315,15 @@ impl< // STEP 1: Get the parent block #[cfg(feature = "prom")] let parent_fetch_start = std::time::Instant::now(); - let parent_request = if parent.1 == self.genesis_hash.into() { + let parent_block = if parent.1 == self.genesis_hash.into() { Either::Left(future::ready(Ok(Block::genesis(self.genesis_hash)))) } else { - let parent_round = if parent.0 == 0 { + let parent_round = if parent.0.get() == 0 { // Parent view is 0, which means that this is the first block of the epoch // TODO(matthias): verify that the parent view of the first block is always 0 (nullify) None } else { - Some(Round::new(round.epoch(), View::new(parent.0))) + Some(Round::new(round.epoch(), parent.0)) }; Either::Right( syncer @@ -331,7 +332,7 @@ impl< .map(|x| x.context("")), ) }; - let parent = parent_request.await.expect("sender dropped"); + let parent_block = parent_block.await.expect("sender dropped"); #[cfg(feature = "prom")] { @@ -344,7 +345,11 @@ impl< #[cfg(feature = "prom")] let finalizer_wait_start = std::time::Instant::now(); // now that we have the parent additionally await for that to be executed by the finalizer - let rx = finalizer.notify_at_height(parent.height()).await; + let parent_height = parent_block.height(); + let parent_digest = parent_block.digest(); + let rx = finalizer + .notify_at_height(parent_height, parent_digest) + .await; // await for notification rx.await.expect("Finalizer dropped"); #[cfg(feature = "prom")] @@ -357,8 +362,8 @@ impl< // STEP 3: Request aux data (withdrawals, checkpoint hash, header hash) #[cfg(feature = "prom")] let aux_data_start = std::time::Instant::now(); - let aux_data = finalizer - .get_aux_data(parent.height() + 1) + let mut aux_data = finalizer + .get_aux_data(parent_height + 1, parent_digest) .await .await .expect("Finalizer dropped"); @@ -382,23 +387,25 @@ impl< // re-propose it as to not produce any blocks that will be cut out // by the epoch transition. let last_in_epoch = utils::last_block_in_epoch(self.epoch_num_of_blocks, aux_data.epoch); - if parent.height() == last_in_epoch { - debug!(round = ?round, digest = ?parent.digest(), "re-proposed parent block at epoch boundary"); - return Ok(parent); + if parent_block.height() == last_in_epoch { + debug!(round = ?round, digest = ?parent_block.digest(), "re-proposed parent block at epoch boundary"); + return Ok(parent_block); } let pending_withdrawals = aux_data.withdrawals; let checkpoint_hash = aux_data.checkpoint_hash; let mut current = self.context.current().epoch_millis(); - if current <= parent.timestamp() { - current = parent.timestamp() + 1; + if current <= parent_block.timestamp() { + current = parent_block.timestamp() + 1; } // STEP 4: Start building block (Engine Client) #[cfg(feature = "prom")] let start_building_start = std::time::Instant::now(); + aux_data.forkchoice.head_block_hash = parent_block.eth_block_hash().into(); + // Add pending withdrawals to the block let withdrawals = pending_withdrawals.into_iter().map(|w| w.inner).collect(); let payload_id = { @@ -409,7 +416,7 @@ impl< aux_data.forkchoice, current, withdrawals, - parent.height(), + parent_block.height(), ) .await } @@ -446,8 +453,8 @@ impl< let compute_digest_start = std::time::Instant::now(); let block = Block::compute_digest( - parent.digest(), - parent.height() + 1, + parent_block.digest(), + parent_block.height() + 1, current, payload_envelope.envelope_inner.execution_payload, payload_envelope.execution_requests.to_vec(), diff --git a/finalizer/src/actor.rs b/finalizer/src/actor.rs index 7184c3b..24974aa 100644 --- a/finalizer/src/actor.rs +++ b/finalizer/src/actor.rs @@ -23,7 +23,7 @@ use metrics::{counter, histogram}; #[cfg(debug_assertions)] use prometheus_client::metrics::gauge::Gauge; use rand::Rng; -use std::collections::BTreeMap; +use std::collections::{BTreeMap, HashMap}; use std::marker::PhantomData; use std::num::NonZero; use std::time::Instant; @@ -39,10 +39,17 @@ use summit_types::utils::{is_last_block_of_epoch, is_penultimate_block_of_epoch} use summit_types::{Block, BlockAuxData, Digest, FinalizedHeader, PublicKey, Signature}; use summit_types::{EngineClient, consensus_state::ConsensusState}; use tokio_util::sync::CancellationToken; -use tracing::{info, warn}; +use tracing::{debug, info, warn}; const WRITE_BUFFER: NonZero = NZUsize!(1024 * 1024); +/// Tracks the consensus state for a notarized (but not yet finalized) block +#[derive(Clone, Debug)] +struct ForkState { + block_digest: Digest, + consensus_state: ConsensusState, +} + pub struct Finalizer< R: Storage + Metrics + Clock + Spawner + governor::clock::Clock + Rng, C: EngineClient, @@ -51,11 +58,20 @@ pub struct Finalizer< V: Variant, > { mailbox: mpsc::Receiver, Block>>, - pending_height_notifys: BTreeMap>>, + pending_height_notifys: BTreeMap<(u64, Digest), Vec>>, context: ContextCell, engine_client: C, db: FinalizerState, - state: ConsensusState, + + // Canonical state (finalized) - contains latest_height + canonical_state: ConsensusState, + + // Fork states (notarized but not yet finalized) + fork_states: BTreeMap>, + + // Orphaned notarized blocks that arrived before their parent + orphaned_blocks: BTreeMap>>>, + genesis_hash: [u8; 32], validator_max_withdrawals_per_block: usize, epoch_num_of_blocks: u64, @@ -129,7 +145,9 @@ impl< pending_height_notifys: BTreeMap::new(), epoch_num_of_blocks: cfg.epoch_num_of_blocks, db, - state: state.clone(), + canonical_state: state.clone(), + fork_states: BTreeMap::new(), + orphaned_blocks: BTreeMap::new(), validator_max_withdrawals_per_block: cfg.validator_max_withdrawals_per_block, genesis_hash: cfg.genesis_hash, protocol_version_digest: Sha256::hash(&cfg.protocol_version.to_le_bytes()), @@ -158,16 +176,18 @@ impl< // Initialize the current epoch with the validator set // This ensures the orchestrator can start consensus immediately - let active_validators = self.state.get_active_validators(); + let active_validators = self.canonical_state.get_active_validators(); let network_keys: Vec<_> = active_validators .iter() .map(|(node_key, _)| node_key.clone()) .collect(); - self.oracle.register(self.state.epoch, network_keys).await; + self.oracle + .register(self.canonical_state.epoch, network_keys) + .await; self.orchestrator_mailbox .report(Message::Enter(EpochTransition { - epoch: Epoch::new(self.state.epoch), + epoch: Epoch::new(self.canonical_state.epoch), validator_keys: active_validators, })) .await; @@ -188,26 +208,40 @@ impl< Update::Tip(_height, _digest) => { // I don't think we need this } - Update::Block((block, finalization), ack_tx) => { - self.handle_execution_block(ack_tx, block, finalization, &mut last_committed_timestamp).await; + Update::FinalizedBlock((block, finalization), ack_tx) => { + self.handle_finalized_block(ack_tx, block, finalization, &mut last_committed_timestamp).await; + } + Update::NotarizedBlock(block) => { + self.handle_notarized_block(block).await; } } }, - FinalizerMessage::NotifyAtHeight { height, response } => { - let last_indexed = self.state.get_latest_height(); - if last_indexed >= height { + FinalizerMessage::NotifyAtHeight { height, block_digest, response } => { + // Check if this specific block has been executed (either canonical or fork) + let executed = if self.canonical_state.get_latest_height() >= height { + // Block could be canonical - we don't track canonical block digests per height, + // so we assume if canonical height >= height, the block is executed + true + } else { + // Check if it exists in fork states + self.fork_states.get(&height) + .map(|forks| forks.contains_key(&block_digest)) + .unwrap_or(false) + }; + + if executed { let _ = response.send(()); continue; } - self.pending_height_notifys.entry(height).or_default().push(response); + self.pending_height_notifys.entry((height, block_digest)).or_default().push(response); }, - FinalizerMessage::GetAuxData { height, response } => { - self.handle_aux_data_mailbox(height, response).await; + FinalizerMessage::GetAuxData { height, parent_digest, response } => { + self.handle_aux_data_mailbox(height, parent_digest, response).await; }, FinalizerMessage::GetEpochGenesisHash { epoch, response } => { // TODO(matthias): verify that this can never happen - assert_eq!(epoch, self.state.epoch); - let _ = response.send(self.state.epoch_genesis_hash); + assert_eq!(epoch, self.canonical_state.epoch); + let _ = response.send(self.canonical_state.epoch_genesis_hash); }, FinalizerMessage::QueryState { request, response } => { self.handle_consensus_state_query(request, response).await; @@ -227,7 +261,7 @@ impl< } #[allow(clippy::type_complexity)] - async fn handle_execution_block( + async fn handle_finalized_block( &mut self, ack_tx: Exact, block: Block, @@ -239,124 +273,99 @@ impl< >, #[allow(unused_variables)] last_committed_timestamp: &mut Option, ) { - #[cfg(feature = "prom")] - let block_processing_start = Instant::now(); - - // check the payload - #[cfg(feature = "prom")] - let payload_check_start = Instant::now(); - let payload_status = self.engine_client.check_payload(&block).await; - let new_height = block.height(); - - #[cfg(feature = "prom")] + let height = block.height(); + let block_digest = block.digest(); + + // Try to find the fork state for this block (if it was notarized before finalization) + if let Some(fork_state) = self + .fork_states + .get(&height) + .and_then(|forks_at_height| forks_at_height.get(&block_digest)) { - let payload_check_duration = payload_check_start.elapsed().as_millis() as f64; - histogram!("payload_check_duration_millis").record(payload_check_duration); - } - - // Verify withdrawal requests that were included in the block - // Make sure that the included withdrawals match the expected withdrawals - let expected_withdrawals: Vec = - if is_last_block_of_epoch(self.epoch_num_of_blocks, new_height) { - let pending_withdrawals = self.state.get_next_ready_withdrawals( - new_height, - self.validator_max_withdrawals_per_block, - ); - pending_withdrawals.into_iter().map(|w| w.inner).collect() - } else { - vec![] - }; - - if payload_status.is_valid() - && block.payload.payload_inner.withdrawals == expected_withdrawals - && self.state.forkchoice.head_block_hash == block.eth_parent_hash() - { - let eth_hash = block.eth_block_hash(); - info!( - "Commiting block 0x{} for height {}", - hex(ð_hash), - new_height + // Block was already executed when notarized, reuse the fork state + debug_assert_eq!( + fork_state.block_digest, block_digest, + "Fork state digest mismatch: expected {:?}, stored {:?}", + block_digest, fork_state.block_digest ); - - let forkchoice = ForkchoiceState { - head_block_hash: eth_hash.into(), - safe_block_hash: eth_hash.into(), - finalized_block_hash: eth_hash.into(), - }; - - #[cfg(feature = "prom")] - { - let num_tx = block.payload.payload_inner.payload_inner.transactions.len(); - counter!("tx_committed_total").increment(num_tx as u64); - counter!("blocks_committed_total").increment(1); - if let Some(last_committed) = last_committed_timestamp { - let block_delta = last_committed.elapsed().as_millis() as f64; - histogram!("block_time_millis").record(block_delta); - } - *last_committed_timestamp = Some(Instant::now()); - } - - self.engine_client.commit_hash(forkchoice).await; - - self.state.forkchoice = forkchoice; - - // Parse execution requests - #[cfg(feature = "prom")] - let parse_requests_start = Instant::now(); - self.parse_execution_requests(&block, new_height).await; - #[cfg(feature = "prom")] - { - let parse_requests_duration = parse_requests_start.elapsed().as_millis() as f64; - histogram!("parse_execution_requests_duration_millis") - .record(parse_requests_duration); - } - - // Add validators that deposited to the validator set - #[cfg(feature = "prom")] - let process_requests_start = Instant::now(); - self.process_execution_requests(&block, new_height).await; - #[cfg(feature = "prom")] - { - let process_requests_duration = process_requests_start.elapsed().as_millis() as f64; - histogram!("process_execution_requests_duration_millis") - .record(process_requests_duration); - } + debug!( + height, + ?block_digest, + "reusing fork state for finalized block" + ); + self.canonical_state = fork_state.consensus_state.clone(); } else { - warn!( - "Height: {new_height} contains invalid eth payload. Not executing but keeping part of chain" + // Block was not notarized before finalization (catch-up or missed notarization) + // Execute it now on canonical state + debug!( + height, + ?block_digest, + "executing finalized block directly (no prior fork state)" ); + execute_block( + &self.engine_client, + &self.context, + &block, + &mut self.canonical_state, + self.epoch_num_of_blocks, + self.validator_max_withdrawals_per_block, + self.protocol_version_digest, + self.validator_minimum_stake, + self.validator_withdrawal_period, + self.validator_onboarding_limit_per_block, + ) + .await; } - #[cfg(debug_assertions)] - { - let gauge: Gauge = Gauge::default(); - gauge.set(new_height as i64); - self.context.register("height", "chain height", gauge); + self.canonical_state.forkchoice.safe_block_hash = + self.canonical_state.forkchoice.head_block_hash; + self.canonical_state.forkchoice.finalized_block_hash = + self.canonical_state.forkchoice.head_block_hash; + + // Prune fork states at or below finalized height + let total_forks = self.fork_states.len(); + self.fork_states.retain(|&h, _| h > height); + let remaining_forks = self.fork_states.len(); + let num_pruned_forks = total_forks - remaining_forks; + if num_pruned_forks > 0 { + debug!(height, pruned = num_pruned_forks, "pruned fork states"); } - self.state.set_latest_height(new_height); - self.state.set_view(block.view()); - assert_eq!(block.epoch(), self.state.epoch); - - // Periodically persist state to database as a blob - // We build the checkpoint one height before the epoch end which - // allows the validators to sign the checkpoint hash in the last block - // of the epoch - if is_penultimate_block_of_epoch(self.epoch_num_of_blocks, new_height) { - #[cfg(feature = "prom")] - let checkpoint_creation_start = Instant::now(); - let checkpoint = Checkpoint::new(&self.state); - self.state.pending_checkpoint = Some(checkpoint); + // Prune orphaned blocks at or below finalized height + let total_orphans = self.orphaned_blocks.len(); + self.orphaned_blocks.retain(|&h, _| h > height); + let remaining_orphans = self.orphaned_blocks.len(); + let num_pruned_orphans = total_orphans - remaining_orphans; + if num_pruned_orphans > 0 { + debug!( + height, + pruned = num_pruned_orphans, + "pruned orphaned blocks" + ); + } - #[cfg(feature = "prom")] - { - let checkpoint_creation_duration = - checkpoint_creation_start.elapsed().as_millis() as f64; - histogram!("checkpoint_creation_duration_millis") - .record(checkpoint_creation_duration); + self.engine_client + .commit_hash(self.canonical_state.forkchoice) + .await; + + #[cfg(feature = "prom")] + { + let num_tx = block.payload.payload_inner.payload_inner.transactions.len(); + counter!("tx_committed_total").increment(num_tx as u64); + counter!("blocks_committed_total").increment(1); + if let Some(last_committed) = last_committed_timestamp { + let block_delta = last_committed.elapsed().as_millis() as f64; + histogram!("block_time_millis").record(block_delta); } + *last_committed_timestamp = Some(Instant::now()); } + let new_height = block.height(); + self.height_notify_up_to(new_height, block_digest); + ack_tx.acknowledge(); + info!(new_height, self.canonical_state.epoch, "executed block"); + + let new_height = block.height(); let mut epoch_change = false; // Store finalizes checkpoint to database if is_last_block_of_epoch(self.epoch_num_of_blocks, new_height) { if let Some(finalization) = finalization { @@ -397,22 +406,29 @@ impl< } // Add and remove validators for the next epoch - if !self.state.added_validators.is_empty() || !self.state.removed_validators.is_empty() + if !self.canonical_state.added_validators.is_empty() + || !self.canonical_state.removed_validators.is_empty() { // TODO(matthias): we can probably find a way to do this without iterating over the joining validators // Activate validators that staked this epoch. - for key in self.state.added_validators.iter() { + for key in self.canonical_state.added_validators.iter() { let key_bytes: [u8; 32] = key.as_ref().try_into().unwrap(); - let account = self.state.validator_accounts.get_mut(&key_bytes).expect( - "only validators with accounts are added to the added_validators queue", - ); + let account = self + .canonical_state + .validator_accounts + .get_mut(&key_bytes) + .expect( + "only validators with accounts are added to the added_validators queue", + ); account.status = ValidatorStatus::Active; } - for key in self.state.removed_validators.iter() { + for key in self.canonical_state.removed_validators.iter() { // TODO(matthias): I think this is not necessary. Inactive accounts will be removed after withdrawing. let key_bytes: [u8; 32] = key.as_ref().try_into().unwrap(); - if let Some(account) = self.state.validator_accounts.get_mut(&key_bytes) { + if let Some(account) = + self.canonical_state.validator_accounts.get_mut(&key_bytes) + { account.status = ValidatorStatus::Inactive; } } @@ -420,7 +436,7 @@ impl< // If the node's public key is contained in the removed validator list, // trigger an exit if self - .state + .canonical_state .removed_validators .iter() .any(|pk| pk == &self.node_public_key) @@ -436,18 +452,20 @@ impl< // The checkpoint is created at the penultimate block of the epoch, and finalized at the last // block. So if a node checkpoints, it will start at the height of the penultimate block. // TODO(matthias): verify this - if let Some(checkpoint) = &self.state.pending_checkpoint { + if let Some(checkpoint) = &self.canonical_state.pending_checkpoint { self.db - .store_finalized_checkpoint(self.state.epoch, checkpoint) + .store_finalized_checkpoint(self.canonical_state.epoch, checkpoint) .await; } // Increment epoch - self.state.epoch += 1; + self.canonical_state.epoch += 1; // Set the epoch genesis hash for the next epoch - self.state.epoch_genesis_hash = block.digest().0; + self.canonical_state.epoch_genesis_hash = block.digest().0; - self.db.store_consensus_state(new_height, &self.state).await; + self.db + .store_consensus_state(new_height, &self.canonical_state) + .await; // This will commit all changes to the state db self.db.commit().await; #[cfg(feature = "prom")] @@ -457,30 +475,32 @@ impl< } // Create the list of validators for the new epoch - let active_validators = self.state.get_active_validators(); + let active_validators = self.canonical_state.get_active_validators(); let network_keys = active_validators .iter() .map(|(node_key, _)| node_key.clone()) .collect(); - self.oracle.register(self.state.epoch, network_keys).await; + self.oracle + .register(self.canonical_state.epoch, network_keys) + .await; // Send the new validator list to the orchestrator amd start the Simplex engine // for the new epoch - let active_validators = self.state.get_active_validators(); + let active_validators = self.canonical_state.get_active_validators(); self.orchestrator_mailbox .report(Message::Enter(EpochTransition { - epoch: Epoch::new(self.state.epoch), + epoch: Epoch::new(self.canonical_state.epoch), validator_keys: active_validators, })) .await; epoch_change = true; // Only clear the added and removed validators after saving the state to disk - if !self.state.added_validators.is_empty() { - self.state.added_validators.clear(); + if !self.canonical_state.added_validators.is_empty() { + self.canonical_state.added_validators.clear(); } - if !self.state.removed_validators.is_empty() { - self.state.removed_validators.clear(); + if !self.canonical_state.removed_validators.is_empty() { + self.canonical_state.removed_validators.clear(); } #[cfg(debug_assertions)] @@ -492,321 +512,138 @@ impl< } } - #[cfg(feature = "prom")] - { - let total_block_processing_duration = - block_processing_start.elapsed().as_millis() as f64; - histogram!("total_block_processing_duration_millis") - .record(total_block_processing_duration); - counter!("blocks_processed_total").increment(1); - } - - self.height_notify_up_to(new_height); - ack_tx.acknowledge(); - info!(new_height, self.state.epoch, "finalized block"); - if epoch_change { // Shut down the Simplex engine for the old epoch self.orchestrator_mailbox - .report(Message::Exit(Epoch::new(self.state.epoch - 1))) + .report(Message::Exit(Epoch::new(self.canonical_state.epoch - 1))) .await; } + info!(new_height, self.canonical_state.epoch, "finalized block"); } - async fn parse_execution_requests(&mut self, block: &Block, new_height: u64) { - for request_bytes in &block.execution_requests { - match ExecutionRequest::try_from_eth_bytes(request_bytes.as_ref()) { - Ok(execution_request) => { - match execution_request { - ExecutionRequest::Deposit(deposit_request) => { - let message = deposit_request.as_message(self.protocol_version_digest); - - let mut node_signature_bytes = &deposit_request.node_signature[..]; - let Ok(node_signature) = Signature::read(&mut node_signature_bytes) - else { - info!( - "Failed to parse node signature from deposit request: {deposit_request:?}" - ); - continue; // Skip this deposit request - }; - if !deposit_request - .node_pubkey - .verify(&[], &message, &node_signature) - { - #[cfg(debug_assertions)] - { - let gauge: Gauge = Gauge::default(); - gauge.set(new_height as i64); - self.context.register( - format!( - "{}_deposit_request_invalid_node_sig", - hex::encode(&deposit_request.node_pubkey) - ), - "height", - gauge, - ); - } - info!( - "Failed to verify node signature from deposit request: {deposit_request:?}" - ); - continue; // Skip this deposit request - } - - let mut consensus_signature_bytes = - &deposit_request.consensus_signature[..]; - let Ok(consensus_signature) = - bls12381::Signature::read(&mut consensus_signature_bytes) - else { - info!( - "Failed to parse consensus signature from deposit request: {deposit_request:?}" - ); - continue; // Skip this deposit request - }; - if !deposit_request.consensus_pubkey.verify( - &[], - &message, - &consensus_signature, - ) { - #[cfg(debug_assertions)] - { - let gauge: Gauge = Gauge::default(); - gauge.set(new_height as i64); - self.context.register( - format!( - "{}_deposit_request_invalid_consensus_sig", - hex::encode(&deposit_request.consensus_pubkey) - ), - "height", - gauge, - ); - } - info!( - "Failed to verify consensus signature from deposit request: {deposit_request:?}" - ); - continue; // Skip this deposit request - } + async fn handle_notarized_block(&mut self, block: Block) { + let mut to_process = vec![block]; - self.state.push_deposit(deposit_request); - } - ExecutionRequest::Withdrawal(mut withdrawal_request) => { - // Only add the withdrawal request if the validator exists and has sufficient balance - if let Some(mut account) = self - .state - .get_account(&withdrawal_request.validator_pubkey) - .cloned() - { - // If the validator already submitted an exit request, we skip this withdrawal request - if matches!(account.status, ValidatorStatus::SubmittedExitRequest) { - info!( - "Failed to parse withdrawal request because the validator already submitted a request: {withdrawal_request:?}" - ); - continue; // Skip this withdrawal request - } + while let Some(block) = to_process.pop() { + let height = block.height(); + let parent_digest = block.parent(); + let block_digest = block.digest(); - // The balance minus any pending withdrawals have to be larger than the amount of the withdrawal request - if account.balance - account.pending_withdrawal_amount - < withdrawal_request.amount - { - info!( - "Failed to parse withdrawal request due to insufficient balance: {withdrawal_request:?}" - ); - continue; // Skip this withdrawal request - } - - // The source address must match the validators withdrawal address - if withdrawal_request.source_address - != account.withdrawal_credentials - { - info!( - "Failed to parse withdrawal request because the source address doesn't match the withdrawal credentials: {withdrawal_request:?}" - ); - continue; // Skip this withdrawal request - } - // If after this withdrawal the validator balance would be less than the - // minimum stake, then the full validator balance is withdrawn. - if account.balance - - account.pending_withdrawal_amount - - withdrawal_request.amount - < self.validator_minimum_stake - { - // Check the remaining balance and set the withdrawal amount accordingly - let remaining_balance = - account.balance - account.pending_withdrawal_amount; - withdrawal_request.amount = remaining_balance; - account.status = ValidatorStatus::SubmittedExitRequest; - } + // Ignore blocks at or below canonical height + if height <= self.canonical_state.latest_height { + debug!( + height, + "ignoring notarized block at or below canonical height" + ); + continue; + } - account.pending_withdrawal_amount += withdrawal_request.amount; - self.state - .set_account(withdrawal_request.validator_pubkey, account); - self.state.push_withdrawal_request( - withdrawal_request.clone(), - new_height + self.validator_withdrawal_period, - ); - } - } - } - } - Err(e) => { - warn!("Failed to parse execution request: {}", e); + // Find and clone parent state: either canonical (if parent was finalized) or a fork state + let parent_state = if height == self.canonical_state.latest_height + 1 { + // Parent should be the canonical block (was finalized) + // Verify parent digest matches canonical head (skip check at genesis) + if self.canonical_state.latest_height > 0 + && parent_digest != self.canonical_state.head_digest + { + // Block is on a dead fork, discard it + debug!( + height, + ?parent_digest, + canonical_head = ?self.canonical_state.head_digest, + "discarding notarized block on dead fork (parent mismatch with canonical)" + ); + continue; } - } - } - } + Some(self.canonical_state.clone()) + } else { + // Parent should be in fork_states + self.fork_states + .get(&(height - 1)) + .and_then(|forks_at_parent| { + let parent_fork = forks_at_parent.get(&parent_digest)?; + debug_assert_eq!( + parent_fork.block_digest, + parent_digest, + "Parent fork state digest mismatch at height {}: expected {:?}, stored {:?}", + height - 1, + parent_digest, + parent_fork.block_digest + ); + Some(parent_fork.consensus_state.clone()) + }) + }; - async fn process_execution_requests(&mut self, block: &Block, new_height: u64) { - if is_penultimate_block_of_epoch(self.epoch_num_of_blocks, new_height) { - for _ in 0..self.validator_onboarding_limit_per_block { - if let Some(request) = self.state.pop_deposit() { - let mut validator_balance = 0; - let mut account_exists = false; - if let Some(mut account) = self - .state - .get_account(request.node_pubkey.as_ref().try_into().unwrap()) - .cloned() - { - if request.index > account.last_deposit_index { - account.balance += request.amount; - account.last_deposit_index = request.index; - #[allow(unused)] - #[cfg(debug_assertions)] - { - validator_balance = account.balance; - } - account.last_deposit_index = request.index; - validator_balance = account.balance; - self.state.set_account( - request.node_pubkey.as_ref().try_into().unwrap(), - account, - ); - account_exists = true; - } - } else { - // Validate the withdrawal credentials format - // Eth1 withdrawal credentials: 0x01 + 11 zero bytes + 20 bytes Ethereum address - if request.withdrawal_credentials.len() != 32 { - warn!( - "Invalid withdrawal credentials length: {} bytes, expected 32", - request.withdrawal_credentials.len() - ); - continue; // Skip this deposit - } - // Check prefix is 0x01 (Eth1 withdrawal) - if request.withdrawal_credentials[0] != 0x01 { - warn!( - "Invalid withdrawal credentials prefix: 0x{:02x}, expected 0x01", - request.withdrawal_credentials[0] - ); - continue; // Skip this deposit - } - // Check 11 zero bytes after the prefix - if !request.withdrawal_credentials[1..12] - .iter() - .all(|&b| b == 0) - { - warn!( - "Invalid withdrawal credentials format: non-zero bytes in positions 1-11" - ); - continue; // Skip this deposit - } + // If we can't find the parent, buffer as orphaned + let Some(mut fork_state) = parent_state else { + debug!( + height, + ?parent_digest, + "buffering orphaned notarized block - parent not found" + ); + self.orphaned_blocks + .entry(height) + .or_default() + .entry(parent_digest) + .or_default() + .push(block); + continue; + }; - // Create new ValidatorAccount from DepositRequest - let new_account = ValidatorAccount { - consensus_public_key: request.consensus_pubkey.clone(), - withdrawal_credentials: Address::from_slice( - &request.withdrawal_credentials[12..32], - ), // Take last 20 bytes - balance: request.amount, - pending_withdrawal_amount: 0, - status: ValidatorStatus::Inactive, - last_deposit_index: request.index, - }; - self.state.set_account( - request.node_pubkey.as_ref().try_into().unwrap(), - new_account, - ); - validator_balance = request.amount; - } - if !account_exists && validator_balance >= self.validator_minimum_stake { - // If the node shuts down, before the account changes are committed, - // then everything should work normally, because the registry is not persisted to disk - self.state - .added_validators - .push(request.node_pubkey.clone()); - } - #[cfg(debug_assertions)] - { - use commonware_codec::Encode; - let gauge: Gauge = Gauge::default(); - gauge.set(validator_balance as i64); - self.context.register( - format!("{}{}{}_deposit_validator_balance", - !account_exists && validator_balance >= self.validator_minimum_stake, - hex::encode(request.withdrawal_credentials), hex::encode(request.node_pubkey.encode())), - "Validator balance", - gauge - ); - } - } - } - } + // Execute the block into the cloned parent state + execute_block( + &self.engine_client, + &self.context, + &block, + &mut fork_state, + self.epoch_num_of_blocks, + self.validator_max_withdrawals_per_block, + self.protocol_version_digest, + self.validator_minimum_stake, + self.validator_withdrawal_period, + self.validator_onboarding_limit_per_block, + ) + .await; - // Remove pending withdrawals that are included in the committed block - for withdrawal in &block.payload.payload_inner.withdrawals { - let pending_withdrawal = self.state.pop_withdrawal(); - // TODO(matthias): these checks should never fail. we have to make sure that these withdrawals are - // verified when the block is verified. it is too late when the block is committed. - let pending_withdrawal = - pending_withdrawal.expect("pending withdrawal must be in state"); - assert_eq!(pending_withdrawal.inner, *withdrawal); - - if let Some(mut account) = self.state.get_account(&pending_withdrawal.pubkey).cloned() - && account.balance >= withdrawal.amount - { - // This check should never fail, because we checked the balance when - // adding the pending withdrawal to the queue - account.balance = account.balance.saturating_sub(withdrawal.amount); - account.pending_withdrawal_amount = account - .pending_withdrawal_amount - .saturating_sub(withdrawal.amount); + // Store the new fork state + self.fork_states.entry(height).or_default().insert( + block_digest, + ForkState { + block_digest, + consensus_state: fork_state.clone(), + }, + ); - #[cfg(debug_assertions)] - { - let gauge: Gauge = Gauge::default(); - gauge.set(account.balance as i64); - self.context.register( - format!( - "{}{}_withdrawal_validator_balance", - hex::encode(account.withdrawal_credentials), - hex::encode(pending_withdrawal.pubkey) - ), - "Validator balance", - gauge, - ); - } + // Commit this fork to reth so validators can build/verify blocks on top of it + // Keep the canonical finalized chain unchanged by using canonical finalized hash + let fork_forkchoice = ForkchoiceState { + head_block_hash: fork_state.forkchoice.head_block_hash, + safe_block_hash: self.canonical_state.forkchoice.finalized_block_hash, + finalized_block_hash: self.canonical_state.forkchoice.finalized_block_hash, + }; + self.engine_client.commit_hash(fork_forkchoice).await; - // If the remaining balance is 0, remove the validator account from the state. - if account.balance == 0 { - self.state.remove_account(&pending_withdrawal.pubkey); - self.state - .removed_validators - .push(PublicKey::decode(&pending_withdrawal.pubkey[..]).unwrap()); // todo(dalton) remove unwrap - } else { - self.state.set_account(pending_withdrawal.pubkey, account); - } + info!(height, ?block_digest, "executed notarized block into fork"); + self.height_notify_up_to(height, block_digest); + + // Add orphaned children to the processing queue + if let Some(children) = self + .orphaned_blocks + .get(&(height + 1)) + .and_then(|children_map| children_map.get(&block_digest)) + { + debug!( + height, + num_children = children.len(), + "queueing orphaned children" + ); + to_process.extend(children.clone()); } } } - fn height_notify_up_to(&mut self, current_height: u64) { - // Split off all entries <= current_height - let to_notify = self.pending_height_notifys.split_off(&(current_height + 1)); - // The original map now contains only entries > current_height - // Swap them back - let remaining = std::mem::replace(&mut self.pending_height_notifys, to_notify); - - // Notify all the split-off entries - for (_, senders) in remaining { + fn height_notify_up_to(&mut self, height: u64, block_digest: Digest) { + // Notify only waiters for this specific (height, digest) pair + if let Some(senders) = self.pending_height_notifys.remove(&(height, block_digest)) { for sender in senders { let _ = sender.send(()); // Ignore if receiver dropped } @@ -816,14 +653,30 @@ impl< async fn handle_aux_data_mailbox( &mut self, height: u64, + parent_digest: Digest, sender: oneshot::Sender, ) { + // We're building a block at `height`, so we need state from parent at `height - 1` + let parent_height = height - 1; + + // Look up the specific parent block's state + let state = if let Some(fork_state) = self + .fork_states + .get(&parent_height) + .and_then(|forks| forks.get(&parent_digest)) + { + &fork_state.consensus_state + } else { + // If not in forks, it must be canonical (or parent height = 0) + &self.canonical_state + }; + // Create checkpoint if we're at an epoch boundary. // The consensus state is saved every `epoch_num_blocks` blocks. // The proposed block will contain the checkpoint that was saved at the previous height. let aux_data = if is_last_block_of_epoch(self.epoch_num_of_blocks, height) { // TODO(matthias): revisit this expect when the ckpt isn't in the DB - let checkpoint_hash = if let Some(checkpoint) = &self.state.pending_checkpoint { + let checkpoint_hash = if let Some(checkpoint) = &state.pending_checkpoint { checkpoint.digest } else { unreachable!("pending checkpoint was calculated at the previous height") @@ -840,27 +693,26 @@ impl< }; // Only submit withdrawals at the end of an epoch - let ready_withdrawals = self - .state - .get_next_ready_withdrawals(height, self.validator_max_withdrawals_per_block); + let ready_withdrawals = + state.get_next_ready_withdrawals(height, self.validator_max_withdrawals_per_block); BlockAuxData { - epoch: self.state.epoch, + epoch: state.epoch, withdrawals: ready_withdrawals, checkpoint_hash: Some(checkpoint_hash), header_hash: prev_header_hash, - added_validators: self.state.added_validators.clone(), - removed_validators: self.state.removed_validators.clone(), - forkchoice: self.state.forkchoice, + added_validators: state.added_validators.clone(), + removed_validators: state.removed_validators.clone(), + forkchoice: state.forkchoice, } } else { BlockAuxData { - epoch: self.state.epoch, + epoch: state.epoch, withdrawals: vec![], checkpoint_hash: None, header_hash: [0; 32].into(), added_validators: vec![], removed_validators: vec![], - forkchoice: self.state.forkchoice, + forkchoice: state.forkchoice, } }; let _ = sender.send(aux_data); @@ -881,7 +733,7 @@ impl< let _ = sender.send(ConsensusStateResponse::Checkpoint(checkpoint)); } ConsensusStateRequest::GetLatestHeight => { - let height = self.state.get_latest_height(); + let height = self.canonical_state.get_latest_height(); let _ = sender.send(ConsensusStateResponse::LatestHeight(height)); } ConsensusStateRequest::GetValidatorBalance(public_key) => { @@ -889,7 +741,7 @@ impl< key_bytes.copy_from_slice(&public_key); let balance = self - .state + .canonical_state .validator_accounts .get(&key_bytes) .map(|account| account.balance); @@ -899,6 +751,456 @@ impl< } } +/// Core execution logic that applies a block's state transitions to any ConsensusState. +/// +/// This method: +/// - Calls check_payload on the engine client (validates and optimistically executes the block on the EVM) +/// - Applies consensus-layer state transitions (deposits, withdrawals, validators) +/// - Updates the forkchoice head +/// - Creates checkpoints at epoch boundaries +/// +/// This does NOT handle epoch transitions (activate validators, increment epoch). +/// Epoch transitions only happen at finalization since the last block of an epoch +/// is always finalized (never notarized+nullified). +#[allow(clippy::too_many_arguments)] +async fn execute_block< + C: EngineClient, + S: Signer, + V: Variant, + R: Storage + Metrics + Clock + Spawner + governor::clock::Clock + Rng, +>( + engine_client: &C, + context: &ContextCell, + block: &Block, + state: &mut ConsensusState, + epoch_num_of_blocks: u64, + validator_max_withdrawals_per_block: usize, + protocol_version_digest: Digest, + validator_minimum_stake: u64, + validator_withdrawal_period: u64, + validator_onboarding_limit_per_block: usize, +) { + #[cfg(feature = "prom")] + let block_processing_start = Instant::now(); + + // check the payload + #[cfg(feature = "prom")] + let payload_check_start = Instant::now(); + let payload_status = engine_client.check_payload(block).await; + let new_height = block.height(); + + #[cfg(feature = "prom")] + { + let payload_check_duration = payload_check_start.elapsed().as_millis() as f64; + histogram!("payload_check_duration_millis").record(payload_check_duration); + } + + // Verify withdrawal requests that were included in the block + // Make sure that the included withdrawals match the expected withdrawals + let expected_withdrawals: Vec = + if is_last_block_of_epoch(epoch_num_of_blocks, new_height) { + let pending_withdrawals = + state.get_next_ready_withdrawals(new_height, validator_max_withdrawals_per_block); + pending_withdrawals.into_iter().map(|w| w.inner).collect() + } else { + vec![] + }; + + // Validate block against state + if payload_status.is_valid() + && block.payload.payload_inner.withdrawals == expected_withdrawals + && state.forkchoice.head_block_hash == block.eth_parent_hash() + { + let eth_hash = block.eth_block_hash(); + info!( + "Commiting block 0x{} for height {}", + hex(ð_hash), + new_height + ); + + state.forkchoice.head_block_hash = eth_hash.into(); + + // Parse execution requests + #[cfg(feature = "prom")] + let parse_requests_start = Instant::now(); + parse_execution_requests( + context, + block, + new_height, + state, + protocol_version_digest, + validator_minimum_stake, + validator_withdrawal_period, + ) + .await; + + #[cfg(feature = "prom")] + { + let parse_requests_duration = parse_requests_start.elapsed().as_millis() as f64; + histogram!("parse_execution_requests_duration_millis").record(parse_requests_duration); + } + + // Add validators that deposited to the validator set + #[cfg(feature = "prom")] + let process_requests_start = Instant::now(); + process_execution_requests( + context, + block, + new_height, + state, + epoch_num_of_blocks, + validator_onboarding_limit_per_block, + validator_minimum_stake, + ) + .await; + #[cfg(feature = "prom")] + { + let process_requests_duration = process_requests_start.elapsed().as_millis() as f64; + histogram!("process_execution_requests_duration_millis") + .record(process_requests_duration); + } + } else { + warn!( + "Height: {new_height} contains invalid eth payload. Not executing but keeping part of chain" + ); + } + + #[cfg(debug_assertions)] + { + let gauge: Gauge = Gauge::default(); + gauge.set(new_height as i64); + context.register("height", "chain height", gauge); + } + state.set_latest_height(new_height); + state.set_view(block.view()); + state.head_digest = block.digest(); + assert_eq!(block.epoch(), state.epoch); + + // Periodically persist state to database as a blob + // We build the checkpoint one height before the epoch end which + // allows the validators to sign the checkpoint hash in the last block + // of the epoch + if is_penultimate_block_of_epoch(epoch_num_of_blocks, new_height) { + #[cfg(feature = "prom")] + let checkpoint_creation_start = Instant::now(); + + let checkpoint = Checkpoint::new(state); + state.pending_checkpoint = Some(checkpoint); + + #[cfg(feature = "prom")] + { + let checkpoint_creation_duration = + checkpoint_creation_start.elapsed().as_millis() as f64; + histogram!("checkpoint_creation_duration_millis").record(checkpoint_creation_duration); + } + } + + #[cfg(feature = "prom")] + { + let total_block_processing_duration = block_processing_start.elapsed().as_millis() as f64; + histogram!("total_block_processing_duration_millis") + .record(total_block_processing_duration); + counter!("blocks_processed_total").increment(1); + } +} + +async fn parse_execution_requests< + S: Signer, + V: Variant, + R: Storage + Metrics + Clock + Spawner + governor::clock::Clock + Rng, +>( + #[allow(unused)] context: &ContextCell, + block: &Block, + new_height: u64, + state: &mut ConsensusState, + protocol_version_digest: Digest, + validator_minimum_stake: u64, + validator_withdrawal_period: u64, +) { + for request_bytes in &block.execution_requests { + match ExecutionRequest::try_from_eth_bytes(request_bytes.as_ref()) { + Ok(execution_request) => { + match execution_request { + ExecutionRequest::Deposit(deposit_request) => { + let message = deposit_request.as_message(protocol_version_digest); + + let mut node_signature_bytes = &deposit_request.node_signature[..]; + let Ok(node_signature) = Signature::read(&mut node_signature_bytes) else { + info!( + "Failed to parse node signature from deposit request: {deposit_request:?}" + ); + continue; // Skip this deposit request + }; + if !deposit_request + .node_pubkey + .verify(&[], &message, &node_signature) + { + #[cfg(debug_assertions)] + { + let gauge: Gauge = Gauge::default(); + gauge.set(new_height as i64); + context.register( + format!( + "{}_deposit_request_invalid_node_sig", + hex::encode(&deposit_request.node_pubkey) + ), + "height", + gauge, + ); + } + info!( + "Failed to verify node signature from deposit request: {deposit_request:?}" + ); + continue; // Skip this deposit request + } + + let mut consensus_signature_bytes = + &deposit_request.consensus_signature[..]; + let Ok(consensus_signature) = + bls12381::Signature::read(&mut consensus_signature_bytes) + else { + info!( + "Failed to parse consensus signature from deposit request: {deposit_request:?}" + ); + continue; // Skip this deposit request + }; + if !deposit_request.consensus_pubkey.verify( + &[], + &message, + &consensus_signature, + ) { + #[cfg(debug_assertions)] + { + let gauge: Gauge = Gauge::default(); + gauge.set(new_height as i64); + context.register( + format!( + "{}_deposit_request_invalid_consensus_sig", + hex::encode(&deposit_request.consensus_pubkey) + ), + "height", + gauge, + ); + } + info!( + "Failed to verify consensus signature from deposit request: {deposit_request:?}" + ); + continue; // Skip this deposit request + } + state.push_deposit(deposit_request); + } + ExecutionRequest::Withdrawal(mut withdrawal_request) => { + // Only add the withdrawal request if the validator exists and has sufficient balance + if let Some(mut account) = state + .get_account(&withdrawal_request.validator_pubkey) + .cloned() + { + // If the validator already submitted an exit request, we skip this withdrawal request + if matches!(account.status, ValidatorStatus::SubmittedExitRequest) { + info!( + "Failed to parse withdrawal request because the validator already submitted a request: {withdrawal_request:?}" + ); + continue; // Skip this withdrawal request + } + + // The balance minus any pending withdrawals have to be larger than the amount of the withdrawal request + if account.balance - account.pending_withdrawal_amount + < withdrawal_request.amount + { + info!( + "Failed to parse withdrawal request due to insufficient balance: {withdrawal_request:?}" + ); + continue; // Skip this withdrawal request + } + + // The source address must match the validators withdrawal address + if withdrawal_request.source_address != account.withdrawal_credentials { + info!( + "Failed to parse withdrawal request because the source address doesn't match the withdrawal credentials: {withdrawal_request:?}" + ); + continue; // Skip this withdrawal request + } + // If after this withdrawal the validator balance would be less than the + // minimum stake, then the full validator balance is withdrawn. + if account.balance + - account.pending_withdrawal_amount + - withdrawal_request.amount + < validator_minimum_stake + { + // Check the remaining balance and set the withdrawal amount accordingly + let remaining_balance = + account.balance - account.pending_withdrawal_amount; + withdrawal_request.amount = remaining_balance; + account.status = ValidatorStatus::SubmittedExitRequest; + } + + account.pending_withdrawal_amount += withdrawal_request.amount; + state.set_account(withdrawal_request.validator_pubkey, account); + state.push_withdrawal_request( + withdrawal_request.clone(), + new_height + validator_withdrawal_period, + ); + } + } + } + } + Err(e) => { + warn!("Failed to parse execution request: {}", e); + } + } + } +} + +async fn process_execution_requests< + S: Signer, + V: Variant, + R: Storage + Metrics + Clock + Spawner + governor::clock::Clock + Rng, +>( + #[allow(unused)] context: &ContextCell, + block: &Block, + new_height: u64, + state: &mut ConsensusState, + epoch_num_of_blocks: u64, + validator_onboarding_limit_per_block: usize, + validator_minimum_stake: u64, +) { + if is_penultimate_block_of_epoch(epoch_num_of_blocks, new_height) { + for _ in 0..validator_onboarding_limit_per_block { + if let Some(request) = state.pop_deposit() { + let mut validator_balance = 0; + let mut account_exists = false; + if let Some(mut account) = state + .get_account(request.node_pubkey.as_ref().try_into().unwrap()) + .cloned() + { + if request.index > account.last_deposit_index { + account.balance += request.amount; + account.last_deposit_index = request.index; + #[allow(unused)] + #[cfg(debug_assertions)] + { + validator_balance = account.balance; + } + account.last_deposit_index = request.index; + validator_balance = account.balance; + state + .set_account(request.node_pubkey.as_ref().try_into().unwrap(), account); + account_exists = true; + } + } else { + // Validate the withdrawal credentials format + // Eth1 withdrawal credentials: 0x01 + 11 zero bytes + 20 bytes Ethereum address + if request.withdrawal_credentials.len() != 32 { + warn!( + "Invalid withdrawal credentials length: {} bytes, expected 32", + request.withdrawal_credentials.len() + ); + continue; // Skip this deposit + } + // Check prefix is 0x01 (Eth1 withdrawal) + if request.withdrawal_credentials[0] != 0x01 { + warn!( + "Invalid withdrawal credentials prefix: 0x{:02x}, expected 0x01", + request.withdrawal_credentials[0] + ); + continue; // Skip this deposit + } + // Check 11 zero bytes after the prefix + if !request.withdrawal_credentials[1..12] + .iter() + .all(|&b| b == 0) + { + warn!( + "Invalid withdrawal credentials format: non-zero bytes in positions 1-11" + ); + continue; // Skip this deposit + } + + // Create new ValidatorAccount from DepositRequest + let new_account = ValidatorAccount { + consensus_public_key: request.consensus_pubkey.clone(), + withdrawal_credentials: Address::from_slice( + &request.withdrawal_credentials[12..32], + ), // Take last 20 bytes + balance: request.amount, + pending_withdrawal_amount: 0, + status: ValidatorStatus::Inactive, + last_deposit_index: request.index, + }; + state.set_account( + request.node_pubkey.as_ref().try_into().unwrap(), + new_account, + ); + validator_balance = request.amount; + } + if !account_exists && validator_balance >= validator_minimum_stake { + // If the node shuts down, before the account changes are committed, + // then everything should work normally, because the registry is not persisted to disk + state.added_validators.push(request.node_pubkey.clone()); + } + #[cfg(debug_assertions)] + { + use commonware_codec::Encode; + let gauge: Gauge = Gauge::default(); + gauge.set(validator_balance as i64); + context.register( + format!("{}{}{}_deposit_validator_balance", + !account_exists && validator_balance >= validator_minimum_stake, + hex::encode(request.withdrawal_credentials), hex::encode(request.node_pubkey.encode())), + "Validator balance", + gauge + ); + } + } + } + } + + // Remove pending withdrawals that are included in the committed block + for withdrawal in &block.payload.payload_inner.withdrawals { + let pending_withdrawal = state.pop_withdrawal(); + // TODO(matthias): these checks should never fail. we have to make sure that these withdrawals are + // verified when the block is verified. it is too late when the block is committed. + let pending_withdrawal = pending_withdrawal.expect("pending withdrawal must be in state"); + assert_eq!(pending_withdrawal.inner, *withdrawal); + + if let Some(mut account) = state.get_account(&pending_withdrawal.pubkey).cloned() + && account.balance >= withdrawal.amount + { + // This check should never fail, because we checked the balance when + // adding the pending withdrawal to the queue + account.balance = account.balance.saturating_sub(withdrawal.amount); + account.pending_withdrawal_amount = account + .pending_withdrawal_amount + .saturating_sub(withdrawal.amount); + + #[cfg(debug_assertions)] + { + let gauge: Gauge = Gauge::default(); + gauge.set(account.balance as i64); + context.register( + format!( + "{}{}_withdrawal_validator_balance", + hex::encode(account.withdrawal_credentials), + hex::encode(pending_withdrawal.pubkey) + ), + "Validator balance", + gauge, + ); + } + + // If the remaining balance is 0, remove the validator account from the state. + if account.balance == 0 { + state.remove_account(&pending_withdrawal.pubkey); + state + .removed_validators + .push(PublicKey::decode(&pending_withdrawal.pubkey[..]).unwrap()); // todo(dalton) remove unwrap + } else { + state.set_account(pending_withdrawal.pubkey, account); + } + } + } +} + impl< R: Storage + Metrics + Clock + Spawner + governor::clock::Clock + Rng, C: EngineClient, diff --git a/finalizer/src/ingress.rs b/finalizer/src/ingress.rs index da1cc7c..90c0b3b 100644 --- a/finalizer/src/ingress.rs +++ b/finalizer/src/ingress.rs @@ -7,7 +7,7 @@ use futures::{ }; use summit_syncer::Update; use summit_types::{ - Block, BlockAuxData, PublicKey, + Block, BlockAuxData, Digest, PublicKey, checkpoint::Checkpoint, consensus_state_query::{ConsensusStateRequest, ConsensusStateResponse}, }; @@ -16,10 +16,12 @@ use summit_types::{ pub enum FinalizerMessage { NotifyAtHeight { height: u64, + block_digest: Digest, response: oneshot::Sender<()>, }, GetAuxData { height: u64, + parent_digest: Digest, response: oneshot::Sender, }, GetEpochGenesisHash { @@ -45,20 +47,36 @@ impl FinalizerMailbox { Self { sender } } - pub async fn notify_at_height(&mut self, height: u64) -> oneshot::Receiver<()> { + pub async fn notify_at_height( + &mut self, + height: u64, + block_digest: Digest, + ) -> oneshot::Receiver<()> { let (response, receiver) = oneshot::channel(); self.sender - .send(FinalizerMessage::NotifyAtHeight { height, response }) + .send(FinalizerMessage::NotifyAtHeight { + height, + block_digest, + response, + }) .await .expect("Unable to send to main Finalizer loop"); receiver } - pub async fn get_aux_data(&mut self, height: u64) -> oneshot::Receiver { + pub async fn get_aux_data( + &mut self, + height: u64, + parent_digest: Digest, + ) -> oneshot::Receiver { let (response, receiver) = oneshot::channel(); self.sender - .send(FinalizerMessage::GetAuxData { height, response }) + .send(FinalizerMessage::GetAuxData { + height, + parent_digest, + response, + }) .await .expect("Unable to send to main Finalizer loop"); diff --git a/syncer/src/actor.rs b/syncer/src/actor.rs index ca3ee01..7402ada 100644 --- a/syncer/src/actor.rs +++ b/syncer/src/actor.rs @@ -424,8 +424,10 @@ where // Search for block locally, otherwise fetch it remotely if let Some(block) = self.find_block(&mut buffer, commitment).await { - // If found, persist the block - self.cache_block(round, commitment, block).await; + // If found, persist the block and send to application + self.cache_block(round, commitment, block.clone()).await; + application.report(Update::NotarizedBlock(block.clone())).await; + self.notify_subscribers(commitment, &block).await; } else { debug!(?round, "notarized block missing"); resolver.fetch(Request::::Notarized { round }).await; @@ -752,8 +754,9 @@ where } // Cache the notarization and block - self.cache_block(round, commitment, block).await; + self.cache_block(round, commitment, block.clone()).await; self.cache.put_notarization(round, commitment, notarization).await; + self.notify_subscribers(commitment, &block).await; }, } }, @@ -811,10 +814,12 @@ where if utils::is_last_block_in_epoch(self.epoch_length, next_height).is_some() { let finalize = self.get_finalization_by_height(next_height).await; application - .report(Update::Block((block, finalize), ack)) + .report(Update::FinalizedBlock((block, finalize), ack)) .await; } else { - application.report(Update::Block((block, None), ack)).await; + application + .report(Update::FinalizedBlock((block, None), ack)) + .await; } self.pending_ack.replace(PendingAck { diff --git a/syncer/src/lib.rs b/syncer/src/lib.rs index 0e14e22..d4e23c7 100644 --- a/syncer/src/lib.rs +++ b/syncer/src/lib.rs @@ -76,10 +76,11 @@ use commonware_consensus::simplex::signing_scheme::Scheme; use commonware_consensus::simplex::types::Finalization; use commonware_utils::{Acknowledgement, acknowledgement::Exact}; -/// An update reported to the application, either a new finalized tip or a finalized block. +/// An update reported to the application: finalized tips, finalized blocks, or notarized blocks. /// /// Finalized tips are reported as soon as known, whether or not we hold all blocks up to that height. /// Finalized blocks are reported to the application in monotonically increasing order (no gaps permitted). +/// Notarized blocks are sent without ordering guarantees to enable execution before finalization. #[derive(Clone, Debug)] pub enum Update { /// A new finalized tip. @@ -92,7 +93,12 @@ pub enum Update { /// /// Because the [Acknowledgement] is clonable, the application can pass [Update] to multiple consumers /// (and marshal will only consider the block delivered once all consumers have acknowledged it). - Block((B, Option>), A), + FinalizedBlock((B, Option>), A), + /// A notarized (but not yet finalized) block. + /// + /// These blocks do not require acknowledgement and may arrive out of order. They enable proposers + /// to build on notarized blocks without waiting for finalization. + NotarizedBlock(B), } #[cfg(test)] diff --git a/syncer/src/mocks/application.rs b/syncer/src/mocks/application.rs index a84bfa4..a52a2ef 100644 --- a/syncer/src/mocks/application.rs +++ b/syncer/src/mocks/application.rs @@ -46,10 +46,13 @@ impl Reporter for Application { Update::Tip(height, commitment) => { *self.tip.lock().unwrap() = Some((height, commitment)); } - Update::Block((block, _), ack_tx) => { + Update::FinalizedBlock((block, _), ack_tx) => { self.blocks.lock().unwrap().insert(block.height(), block); ack_tx.acknowledge(); } + Update::NotarizedBlock(_block) => { + // Mock application ignores notarized blocks + } } } } diff --git a/types/src/checkpoint.rs b/types/src/checkpoint.rs index dc743c1..4ded295 100644 --- a/types/src/checkpoint.rs +++ b/types/src/checkpoint.rs @@ -130,7 +130,7 @@ mod tests { use crate::checkpoint::Checkpoint; use crate::consensus_state::ConsensusState; use commonware_codec::DecodeExt; - use commonware_cryptography::{PrivateKeyExt, Signer, bls12381}; + use commonware_cryptography::{PrivateKeyExt, Signer, bls12381, sha256}; use ssz::{Decode, Encode}; use std::collections::{HashMap, VecDeque}; @@ -149,6 +149,7 @@ mod tests { epoch: 0, view: 0, latest_height: 10, + head_digest: commonware_cryptography::sha256::Digest([0u8; 32]), next_withdrawal_index: 100, deposit_queue: VecDeque::new(), withdrawal_queue: VecDeque::new(), @@ -254,6 +255,7 @@ mod tests { epoch: 0, view: 0, latest_height: 1000, + head_digest: sha256::Digest([0u8; 32]), next_withdrawal_index: 200, deposit_queue, withdrawal_queue, @@ -289,6 +291,7 @@ mod tests { epoch: 0, view: 0, latest_height: 42, + head_digest: sha256::Digest([0u8; 32]), next_withdrawal_index: 99, deposit_queue: VecDeque::new(), withdrawal_queue: VecDeque::new(), @@ -401,6 +404,7 @@ mod tests { epoch: 0, view: 0, latest_height: 2000, + head_digest: sha256::Digest([0u8; 32]), next_withdrawal_index: 300, deposit_queue, withdrawal_queue, @@ -441,6 +445,7 @@ mod tests { epoch: 0, view: 0, latest_height: 42, + head_digest: sha256::Digest([0u8; 32]), next_withdrawal_index: 99, deposit_queue: VecDeque::new(), withdrawal_queue: VecDeque::new(), @@ -487,6 +492,7 @@ mod tests { epoch: 0, view: 0, latest_height: 42, + head_digest: sha256::Digest([0u8; 32]), next_withdrawal_index: 99, deposit_queue: VecDeque::new(), withdrawal_queue: VecDeque::new(), @@ -529,6 +535,7 @@ mod tests { epoch: 0, view: 0, latest_height: 42, + head_digest: sha256::Digest([0u8; 32]), next_withdrawal_index: 99, deposit_queue: VecDeque::new(), withdrawal_queue: VecDeque::new(), @@ -612,6 +619,7 @@ mod tests { epoch: 0, view: 0, latest_height: 1000, + head_digest: sha256::Digest([0u8; 32]), next_withdrawal_index: 200, deposit_queue, withdrawal_queue, diff --git a/types/src/consensus_state.rs b/types/src/consensus_state.rs index f36d510..fbbd53f 100644 --- a/types/src/consensus_state.rs +++ b/types/src/consensus_state.rs @@ -1,20 +1,21 @@ -use crate::PublicKey; use crate::account::{ValidatorAccount, ValidatorStatus}; use crate::checkpoint::Checkpoint; use crate::execution_request::{DepositRequest, WithdrawalRequest}; use crate::withdrawal::PendingWithdrawal; +use crate::{Digest, PublicKey}; use alloy_eips::eip4895::Withdrawal; use alloy_rpc_types_engine::ForkchoiceState; use bytes::{Buf, BufMut}; use commonware_codec::{DecodeExt, EncodeSize, Error, Read, ReadExt, Write}; -use commonware_cryptography::bls12381; +use commonware_cryptography::{bls12381, sha256}; use std::collections::{HashMap, VecDeque}; -#[derive(Clone, Debug, Default)] +#[derive(Clone, Debug)] pub struct ConsensusState { pub epoch: u64, pub view: u64, pub latest_height: u64, + pub head_digest: Digest, pub next_withdrawal_index: u64, pub deposit_queue: VecDeque, pub withdrawal_queue: VecDeque, @@ -26,6 +27,26 @@ pub struct ConsensusState { pub epoch_genesis_hash: [u8; 32], } +impl Default for ConsensusState { + fn default() -> Self { + Self { + epoch: 0, + view: 0, + latest_height: 0, + head_digest: sha256::Digest([0u8; 32]), + next_withdrawal_index: 0, + deposit_queue: Default::default(), + withdrawal_queue: Default::default(), + validator_accounts: Default::default(), + pending_checkpoint: None, + added_validators: Vec::new(), + removed_validators: Vec::new(), + forkchoice: Default::default(), + epoch_genesis_hash: [0u8; 32], + } + } +} + impl ConsensusState { pub fn new(forkchoice: ForkchoiceState) -> Self { Self { @@ -251,6 +272,7 @@ impl EncodeSize for ConsensusState { + 32 // forkchoice.safe_block_hash + 32 // forkchoice.finalized_block_hash + 32 // epoch_genesis_hash + + 32 // head_digest } } @@ -323,10 +345,15 @@ impl Read for ConsensusState { let mut epoch_genesis_hash = [0u8; 32]; buf.copy_to_slice(&mut epoch_genesis_hash); + let mut head_digest_bytes = [0u8; 32]; + buf.copy_to_slice(&mut head_digest_bytes); + let head_digest = sha256::Digest(head_digest_bytes); + Ok(Self { epoch, view, latest_height, + head_digest, next_withdrawal_index, deposit_queue, withdrawal_queue, @@ -390,6 +417,9 @@ impl Write for ConsensusState { // Write epoch_genesis_hash buf.put_slice(&self.epoch_genesis_hash); + + // Write head_digest + buf.put_slice(&self.head_digest.0); } }