From e5db20471fb2237f12d5fa9b45aeb6ad4abba164 Mon Sep 17 00:00:00 2001 From: Yong Kang Date: Mon, 27 Oct 2025 04:33:12 +0000 Subject: [PATCH 1/8] reorder multiproof.rs --- .../src/tree/payload_processor/multiproof.rs | 594 ++++++++---------- 1 file changed, 261 insertions(+), 333 deletions(-) diff --git a/crates/engine/tree/src/tree/payload_processor/multiproof.rs b/crates/engine/tree/src/tree/payload_processor/multiproof.rs index 755f7a7d0d7..0baafeda6ca 100644 --- a/crates/engine/tree/src/tree/payload_processor/multiproof.rs +++ b/crates/engine/tree/src/tree/payload_processor/multiproof.rs @@ -1,11 +1,7 @@ //! Multiproof task related functionality. use alloy_evm::block::StateChangeSource; -use alloy_primitives::{ - keccak256, - map::{B256Set, HashSet}, - B256, -}; +use alloy_primitives::{keccak256, map::HashSet, B256}; use crossbeam_channel::{unbounded, Receiver as CrossbeamReceiver, Sender as CrossbeamSender}; use dashmap::DashMap; use derive_more::derive::Deref; @@ -24,287 +20,6 @@ use reth_trie_parallel::{ use std::{collections::BTreeMap, ops::DerefMut, sync::Arc, time::Instant}; use tracing::{debug, error, instrument, trace}; -/// A trie update that can be applied to sparse trie alongside the proofs for touched parts of the -/// state. -#[derive(Default, Debug)] -pub struct SparseTrieUpdate { - /// The state update that was used to calculate the proof - pub(crate) state: HashedPostState, - /// The calculated multiproof - pub(crate) multiproof: DecodedMultiProof, -} - -impl SparseTrieUpdate { - /// Returns true if the update is empty. - pub(super) fn is_empty(&self) -> bool { - self.state.is_empty() && self.multiproof.is_empty() - } - - /// Construct update from multiproof. - #[cfg(test)] - pub(super) fn from_multiproof(multiproof: reth_trie::MultiProof) -> alloy_rlp::Result { - Ok(Self { multiproof: multiproof.try_into()?, ..Default::default() }) - } - - /// Extend update with contents of the other. - pub(super) fn extend(&mut self, other: Self) { - self.state.extend(other.state); - self.multiproof.extend(other.multiproof); - } -} - -/// Common configuration for multi proof tasks -#[derive(Debug, Clone)] -pub(super) struct MultiProofConfig { - /// The sorted collection of cached in-memory intermediate trie nodes that - /// can be reused for computation. - pub nodes_sorted: Arc, - /// The sorted in-memory overlay hashed state. - pub state_sorted: Arc, - /// The collection of prefix sets for the computation. Since the prefix sets _always_ - /// invalidate the in-memory nodes, not all keys from `state_sorted` might be present here, - /// if we have cached nodes for them. - pub prefix_sets: Arc, -} - -impl MultiProofConfig { - /// Creates a new state root config from the trie input. - /// - /// This returns a cleared [`TrieInput`] so that we can reuse any allocated space in the - /// [`TrieInput`]. - pub(super) fn from_input(mut input: TrieInput) -> (TrieInput, Self) { - let config = Self { - nodes_sorted: Arc::new(input.nodes.drain_into_sorted()), - state_sorted: Arc::new(input.state.drain_into_sorted()), - prefix_sets: Arc::new(input.prefix_sets.clone()), - }; - (input.cleared(), config) - } -} - -/// Messages used internally by the multi proof task. -#[derive(Debug)] -pub(super) enum MultiProofMessage { - /// Prefetch proof targets - PrefetchProofs(MultiProofTargets), - /// New state update from transaction execution with its source - StateUpdate(StateChangeSource, EvmState), - /// State update that can be applied to the sparse trie without any new proofs. - /// - /// It can be the case when all accounts and storage slots from the state update were already - /// fetched and revealed. - EmptyProof { - /// The index of this proof in the sequence of state updates - sequence_number: u64, - /// The state update that was used to calculate the proof - state: HashedPostState, - }, - /// Signals state update stream end. - /// - /// This is triggered by block execution, indicating that no additional state updates are - /// expected. - FinishedStateUpdates, -} - -/// Handle to track proof calculation ordering. -#[derive(Debug, Default)] -struct ProofSequencer { - /// The next proof sequence number to be produced. - next_sequence: u64, - /// The next sequence number expected to be delivered. - next_to_deliver: u64, - /// Buffer for out-of-order proofs and corresponding state updates - pending_proofs: BTreeMap, -} - -impl ProofSequencer { - /// Gets the next sequence number and increments the counter - const fn next_sequence(&mut self) -> u64 { - let seq = self.next_sequence; - self.next_sequence += 1; - seq - } - - /// Adds a proof with the corresponding state update and returns all sequential proofs and state - /// updates if we have a continuous sequence - fn add_proof(&mut self, sequence: u64, update: SparseTrieUpdate) -> Vec { - if sequence >= self.next_to_deliver { - self.pending_proofs.insert(sequence, update); - } - - // return early if we don't have the next expected proof - if !self.pending_proofs.contains_key(&self.next_to_deliver) { - return Vec::new() - } - - let mut consecutive_proofs = Vec::with_capacity(self.pending_proofs.len()); - let mut current_sequence = self.next_to_deliver; - - // keep collecting proofs and state updates as long as we have consecutive sequence numbers - while let Some(pending) = self.pending_proofs.remove(¤t_sequence) { - consecutive_proofs.push(pending); - current_sequence += 1; - - // if we don't have the next number, stop collecting - if !self.pending_proofs.contains_key(¤t_sequence) { - break; - } - } - - self.next_to_deliver += consecutive_proofs.len() as u64; - - consecutive_proofs - } - - /// Returns true if we still have pending proofs - pub(crate) fn has_pending(&self) -> bool { - !self.pending_proofs.is_empty() - } -} - -/// A wrapper for the sender that signals completion when dropped. -/// -/// This type is intended to be used in combination with the evm executor statehook. -/// This should trigger once the block has been executed (after) the last state update has been -/// sent. This triggers the exit condition of the multi proof task. -#[derive(Deref, Debug)] -pub(super) struct StateHookSender(CrossbeamSender); - -impl StateHookSender { - pub(crate) const fn new(inner: CrossbeamSender) -> Self { - Self(inner) - } -} - -impl Drop for StateHookSender { - fn drop(&mut self) { - // Send completion signal when the sender is dropped - let _ = self.0.send(MultiProofMessage::FinishedStateUpdates); - } -} - -pub(crate) fn evm_state_to_hashed_post_state(update: EvmState) -> HashedPostState { - let mut hashed_state = HashedPostState::with_capacity(update.len()); - - for (address, account) in update { - if account.is_touched() { - let hashed_address = keccak256(address); - trace!(target: "engine::tree::payload_processor::multiproof", ?address, ?hashed_address, "Adding account to state update"); - - let destroyed = account.is_selfdestructed(); - let info = if destroyed { None } else { Some(account.info.into()) }; - hashed_state.accounts.insert(hashed_address, info); - - let mut changed_storage_iter = account - .storage - .into_iter() - .filter(|(_slot, value)| value.is_changed()) - .map(|(slot, value)| (keccak256(B256::from(slot)), value.present_value)) - .peekable(); - - if destroyed { - hashed_state.storages.insert(hashed_address, HashedStorage::new(true)); - } else if changed_storage_iter.peek().is_some() { - hashed_state - .storages - .insert(hashed_address, HashedStorage::from_iter(false, changed_storage_iter)); - } - } - } - - hashed_state -} - -/// A pending multiproof task, either [`StorageMultiproofInput`] or [`MultiproofInput`]. -#[derive(Debug)] -enum PendingMultiproofTask { - /// A storage multiproof task input. - Storage(StorageMultiproofInput), - /// A regular multiproof task input. - Regular(MultiproofInput), -} - -impl PendingMultiproofTask { - /// Returns the proof sequence number of the task. - const fn proof_sequence_number(&self) -> u64 { - match self { - Self::Storage(input) => input.proof_sequence_number, - Self::Regular(input) => input.proof_sequence_number, - } - } - - /// Returns whether or not the proof targets are empty. - fn proof_targets_is_empty(&self) -> bool { - match self { - Self::Storage(input) => input.proof_targets.is_empty(), - Self::Regular(input) => input.proof_targets.is_empty(), - } - } - - /// Destroys the input and sends a [`MultiProofMessage::EmptyProof`] message to the sender. - fn send_empty_proof(self) { - match self { - Self::Storage(input) => input.send_empty_proof(), - Self::Regular(input) => input.send_empty_proof(), - } - } -} - -impl From for PendingMultiproofTask { - fn from(input: StorageMultiproofInput) -> Self { - Self::Storage(input) - } -} - -impl From for PendingMultiproofTask { - fn from(input: MultiproofInput) -> Self { - Self::Regular(input) - } -} - -/// Input parameters for dispatching a dedicated storage multiproof calculation. -#[derive(Debug)] -struct StorageMultiproofInput { - hashed_state_update: HashedPostState, - hashed_address: B256, - proof_targets: B256Set, - proof_sequence_number: u64, - state_root_message_sender: CrossbeamSender, - multi_added_removed_keys: Arc, -} - -impl StorageMultiproofInput { - /// Destroys the input and sends a [`MultiProofMessage::EmptyProof`] message to the sender. - fn send_empty_proof(self) { - let _ = self.state_root_message_sender.send(MultiProofMessage::EmptyProof { - sequence_number: self.proof_sequence_number, - state: self.hashed_state_update, - }); - } -} - -/// Input parameters for dispatching a multiproof calculation. -#[derive(Debug)] -struct MultiproofInput { - config: MultiProofConfig, - source: Option, - hashed_state_update: HashedPostState, - proof_targets: MultiProofTargets, - proof_sequence_number: u64, - state_root_message_sender: CrossbeamSender, - multi_added_removed_keys: Option>, -} - -impl MultiproofInput { - /// Destroys the input and sends a [`MultiProofMessage::EmptyProof`] message to the sender. - fn send_empty_proof(self) { - let _ = self.state_root_message_sender.send(MultiProofMessage::EmptyProof { - sequence_number: self.proof_sequence_number, - state: self.hashed_state_update, - }); - } -} - /// Coordinates multiproof dispatch between `MultiProofTask` and the parallel trie workers. /// /// # Flow @@ -516,52 +231,6 @@ impl MultiproofManager { } } -#[derive(Metrics, Clone)] -#[metrics(scope = "tree.root")] -pub(crate) struct MultiProofTaskMetrics { - /// Histogram of inflight multiproofs. - pub inflight_multiproofs_histogram: Histogram, - /// Histogram of pending storage multiproofs in the queue. - pub pending_storage_multiproofs_histogram: Histogram, - /// Histogram of pending account multiproofs in the queue. - pub pending_account_multiproofs_histogram: Histogram, - - /// Histogram of the number of prefetch proof target accounts. - pub prefetch_proof_targets_accounts_histogram: Histogram, - /// Histogram of the number of prefetch proof target storages. - pub prefetch_proof_targets_storages_histogram: Histogram, - /// Histogram of the number of prefetch proof target chunks. - pub prefetch_proof_chunks_histogram: Histogram, - - /// Histogram of the number of state update proof target accounts. - pub state_update_proof_targets_accounts_histogram: Histogram, - /// Histogram of the number of state update proof target storages. - pub state_update_proof_targets_storages_histogram: Histogram, - /// Histogram of the number of state update proof target chunks. - pub state_update_proof_chunks_histogram: Histogram, - - /// Histogram of proof calculation durations. - pub proof_calculation_duration_histogram: Histogram, - - /// Histogram of sparse trie update durations. - pub sparse_trie_update_duration_histogram: Histogram, - /// Histogram of sparse trie final update durations. - pub sparse_trie_final_update_duration_histogram: Histogram, - /// Histogram of sparse trie total durations. - pub sparse_trie_total_duration_histogram: Histogram, - - /// Histogram of state updates received. - pub state_updates_received_histogram: Histogram, - /// Histogram of proofs processed. - pub proofs_processed_histogram: Histogram, - /// Histogram of total time spent in the multiproof task. - pub multiproof_task_total_duration_histogram: Histogram, - /// Total time spent waiting for the first state update or prefetch request. - pub first_update_wait_time_histogram: Histogram, - /// Total time spent waiting for the last proof result. - pub last_proof_wait_time_histogram: Histogram, -} - /// Standalone task that receives a transaction state stream and updates relevant /// data structures to calculate state root. /// @@ -784,7 +453,7 @@ impl MultiProofTask { chunks } - // Returns true if all state updates finished and all proofs processed. + /// Returns true if all state updates finished and all proofs processed. fn is_done( &self, proofs_processed: u64, @@ -1202,6 +871,265 @@ impl MultiProofTask { } } +#[derive(Metrics, Clone)] +#[metrics(scope = "tree.root")] +pub(crate) struct MultiProofTaskMetrics { + /// Histogram of inflight multiproofs. + pub inflight_multiproofs_histogram: Histogram, + /// Histogram of pending storage multiproofs in the queue. + pub pending_storage_multiproofs_histogram: Histogram, + /// Histogram of pending account multiproofs in the queue. + pub pending_account_multiproofs_histogram: Histogram, + + /// Histogram of the number of prefetch proof target accounts. + pub prefetch_proof_targets_accounts_histogram: Histogram, + /// Histogram of the number of prefetch proof target storages. + pub prefetch_proof_targets_storages_histogram: Histogram, + /// Histogram of the number of prefetch proof target chunks. + pub prefetch_proof_chunks_histogram: Histogram, + + /// Histogram of the number of state update proof target accounts. + pub state_update_proof_targets_accounts_histogram: Histogram, + /// Histogram of the number of state update proof target storages. + pub state_update_proof_targets_storages_histogram: Histogram, + /// Histogram of the number of state update proof target chunks. + pub state_update_proof_chunks_histogram: Histogram, + + /// Histogram of proof calculation durations. + pub proof_calculation_duration_histogram: Histogram, + + /// Histogram of sparse trie update durations. + pub sparse_trie_update_duration_histogram: Histogram, + /// Histogram of sparse trie final update durations. + pub sparse_trie_final_update_duration_histogram: Histogram, + /// Histogram of sparse trie total durations. + pub sparse_trie_total_duration_histogram: Histogram, + + /// Histogram of state updates received. + pub state_updates_received_histogram: Histogram, + /// Histogram of proofs processed. + pub proofs_processed_histogram: Histogram, + /// Histogram of total time spent in the multiproof task. + pub multiproof_task_total_duration_histogram: Histogram, + /// Total time spent waiting for the first state update or prefetch request. + pub first_update_wait_time_histogram: Histogram, + /// Total time spent waiting for the last proof result. + pub last_proof_wait_time_histogram: Histogram, +} + +/// A trie update that can be applied to sparse trie alongside the proofs for touched parts of the +/// state. +#[derive(Default, Debug)] +pub struct SparseTrieUpdate { + /// The state update that was used to calculate the proof + pub(crate) state: HashedPostState, + /// The calculated multiproof + pub(crate) multiproof: DecodedMultiProof, +} + +impl SparseTrieUpdate { + /// Returns true if the update is empty. + pub(super) fn is_empty(&self) -> bool { + self.state.is_empty() && self.multiproof.is_empty() + } + + /// Construct update from multiproof. + #[cfg(test)] + pub(super) fn from_multiproof(multiproof: reth_trie::MultiProof) -> alloy_rlp::Result { + Ok(Self { multiproof: multiproof.try_into()?, ..Default::default() }) + } + + /// Extend update with contents of the other. + pub(super) fn extend(&mut self, other: Self) { + self.state.extend(other.state); + self.multiproof.extend(other.multiproof); + } +} + +/// Common configuration for multi proof tasks +#[derive(Debug, Clone)] +pub(super) struct MultiProofConfig { + /// The sorted collection of cached in-memory intermediate trie nodes that + /// can be reused for computation. + pub nodes_sorted: Arc, + /// The sorted in-memory overlay hashed state. + pub state_sorted: Arc, + /// The collection of prefix sets for the computation. Since the prefix sets _always_ + /// invalidate the in-memory nodes, not all keys from `state_sorted` might be present here, + /// if we have cached nodes for them. + pub prefix_sets: Arc, +} + +impl MultiProofConfig { + /// Creates a new state root config from the trie input. + /// + /// This returns a cleared [`TrieInput`] so that we can reuse any allocated space in the + /// [`TrieInput`]. + pub(super) fn from_input(mut input: TrieInput) -> (TrieInput, Self) { + let config = Self { + nodes_sorted: Arc::new(input.nodes.drain_into_sorted()), + state_sorted: Arc::new(input.state.drain_into_sorted()), + prefix_sets: Arc::new(input.prefix_sets.clone()), + }; + (input.cleared(), config) + } +} + +/// Messages used internally by the multi proof task. +#[derive(Debug)] +pub(super) enum MultiProofMessage { + /// Prefetch proof targets + PrefetchProofs(MultiProofTargets), + /// New state update from transaction execution with its source + StateUpdate(StateChangeSource, EvmState), + /// State update that can be applied to the sparse trie without any new proofs. + /// + /// It can be the case when all accounts and storage slots from the state update were already + /// fetched and revealed. + EmptyProof { + /// The index of this proof in the sequence of state updates + sequence_number: u64, + /// The state update that was used to calculate the proof + state: HashedPostState, + }, + /// Signals state update stream end. + /// + /// This is triggered by block execution, indicating that no additional state updates are + /// expected. + FinishedStateUpdates, +} + +/// A wrapper for the sender that signals completion when dropped. +/// +/// This type is intended to be used in combination with the evm executor statehook. +/// This should trigger once the block has been executed (after) the last state update has been +/// sent. This triggers the exit condition of the multi proof task. +#[derive(Deref, Debug)] +pub(super) struct StateHookSender(CrossbeamSender); + +impl StateHookSender { + pub(crate) const fn new(inner: CrossbeamSender) -> Self { + Self(inner) + } +} + +impl Drop for StateHookSender { + fn drop(&mut self) { + // Send completion signal when the sender is dropped + let _ = self.0.send(MultiProofMessage::FinishedStateUpdates); + } +} + +/// Handle to track proof calculation ordering. +#[derive(Debug, Default)] +struct ProofSequencer { + /// The next proof sequence number to be produced. + next_sequence: u64, + /// The next sequence number expected to be delivered. + next_to_deliver: u64, + /// Buffer for out-of-order proofs and corresponding state updates + pending_proofs: BTreeMap, +} + +impl ProofSequencer { + /// Gets the next sequence number and increments the counter + const fn next_sequence(&mut self) -> u64 { + let seq = self.next_sequence; + self.next_sequence += 1; + seq + } + + /// Adds a proof with the corresponding state update and returns all sequential proofs and state + /// updates if we have a continuous sequence + fn add_proof(&mut self, sequence: u64, update: SparseTrieUpdate) -> Vec { + if sequence >= self.next_to_deliver { + self.pending_proofs.insert(sequence, update); + } + + // return early if we don't have the next expected proof + if !self.pending_proofs.contains_key(&self.next_to_deliver) { + return Vec::new() + } + + let mut consecutive_proofs = Vec::with_capacity(self.pending_proofs.len()); + let mut current_sequence = self.next_to_deliver; + + // keep collecting proofs and state updates as long as we have consecutive sequence numbers + while let Some(pending) = self.pending_proofs.remove(¤t_sequence) { + consecutive_proofs.push(pending); + current_sequence += 1; + + // if we don't have the next number, stop collecting + if !self.pending_proofs.contains_key(¤t_sequence) { + break; + } + } + + self.next_to_deliver += consecutive_proofs.len() as u64; + + consecutive_proofs + } + + /// Returns true if we still have pending proofs + pub(crate) fn has_pending(&self) -> bool { + !self.pending_proofs.is_empty() + } +} + +pub(crate) fn evm_state_to_hashed_post_state(update: EvmState) -> HashedPostState { + let mut hashed_state = HashedPostState::with_capacity(update.len()); + + for (address, account) in update { + if account.is_touched() { + let hashed_address = keccak256(address); + trace!(target: "engine::tree::payload_processor::multiproof", ?address, ?hashed_address, "Adding account to state update"); + + let destroyed = account.is_selfdestructed(); + let info = if destroyed { None } else { Some(account.info.into()) }; + hashed_state.accounts.insert(hashed_address, info); + + let mut changed_storage_iter = account + .storage + .into_iter() + .filter(|(_slot, value)| value.is_changed()) + .map(|(slot, value)| (keccak256(B256::from(slot)), value.present_value)) + .peekable(); + + if destroyed { + hashed_state.storages.insert(hashed_address, HashedStorage::new(true)); + } else if changed_storage_iter.peek().is_some() { + hashed_state + .storages + .insert(hashed_address, HashedStorage::from_iter(false, changed_storage_iter)); + } + } + } + + hashed_state +} + +/// Input parameters for dispatching a multiproof calculation. +#[derive(Debug)] +struct MultiproofInput { + config: MultiProofConfig, + source: Option, + hashed_state_update: HashedPostState, + proof_targets: MultiProofTargets, + proof_sequence_number: u64, + state_root_message_sender: CrossbeamSender, + multi_added_removed_keys: Option>, +} + +impl MultiproofInput { + /// Destroys the input and sends a [`MultiProofMessage::EmptyProof`] message to the sender. + fn send_empty_proof(self) { + let _ = self.state_root_message_sender.send(MultiProofMessage::EmptyProof { + sequence_number: self.proof_sequence_number, + state: self.hashed_state_update, + }); + } +} + /// Returns accounts only with those storages that were not already fetched, and /// if there are no such storages and the account itself was already fetched, the /// account shouldn't be included. From b879126a778e4295ef792f1cf368a67eeb28ae1f Mon Sep 17 00:00:00 2001 From: Yong Kang Date: Mon, 27 Oct 2025 04:42:38 +0000 Subject: [PATCH 2/8] refactor(trie): reorder proof worker handle and task management - Introduced `ProofWorkerHandle` for type-safe access to storage and account worker pools. - Replaced `ProofResultContext` with direct senders for improved performance. - Updated worker loops to handle proof tasks more efficiently, including better tracing and error handling. - Added methods for dispatching storage and account multiproof computations, improving interleaved parallelism. - Enhanced metrics tracking for storage and account processing. --- crates/trie/parallel/src/proof_task.rs | 2217 ++++++++++++------------ 1 file changed, 1126 insertions(+), 1091 deletions(-) diff --git a/crates/trie/parallel/src/proof_task.rs b/crates/trie/parallel/src/proof_task.rs index c05f2ad7286..f210d1b0ccf 100644 --- a/crates/trie/parallel/src/proof_task.rs +++ b/crates/trie/parallel/src/proof_task.rs @@ -77,1255 +77,1290 @@ use std::{ time::{Duration, Instant}, }; use tokio::runtime::Handle; -use tracing::{debug, debug_span, error, trace}; +use tracing::{debug_span, error, trace}; #[cfg(feature = "metrics")] use crate::proof_task_metrics::ProofTaskTrieMetrics; type StorageProofResult = Result; type TrieNodeProviderResult = Result, SparseTrieError>; -type AccountMultiproofResult = - Result<(DecodedMultiProof, ParallelTrieStats), ParallelStateRootError>; -/// Channel used by worker threads to deliver `ProofResultMessage` items back to -/// `MultiProofTask`. -/// -/// Workers use this sender to deliver proof results directly to `MultiProofTask`. -pub type ProofResultSender = CrossbeamSender; - -/// Message containing a completed proof result with metadata for direct delivery to -/// `MultiProofTask`. -/// -/// This type enables workers to send proof results directly to the `MultiProofTask` event loop. -#[derive(Debug)] -pub struct ProofResultMessage { - /// Sequence number for ordering proofs - pub sequence_number: u64, - /// The proof calculation result - pub result: AccountMultiproofResult, - /// Time taken for the entire proof calculation (from dispatch to completion) - pub elapsed: Duration, - /// Original state update that triggered this proof - pub state: HashedPostState, -} - -/// Context for sending proof calculation results back to `MultiProofTask`. +/// A handle that provides type-safe access to proof worker pools. /// -/// This struct contains all context needed to send and track proof calculation results. -/// Workers use this to deliver completed proofs back to the main event loop. +/// The handle stores direct senders to both storage and account worker pools, +/// eliminating the need for a routing thread. All handles share reference-counted +/// channels, and workers shut down gracefully when all handles are dropped. #[derive(Debug, Clone)] -pub struct ProofResultContext { - /// Channel sender for result delivery - pub sender: ProofResultSender, - /// Sequence number for proof ordering - pub sequence_number: u64, - /// Original state update that triggered this proof - pub state: HashedPostState, - /// Calculation start time for measuring elapsed duration - pub start_time: Instant, +pub struct ProofWorkerHandle { + /// Direct sender to storage worker pool + storage_work_tx: CrossbeamSender, + /// Direct sender to account worker pool + account_work_tx: CrossbeamSender, + /// Counter tracking available storage workers. Workers decrement when starting work, + /// increment when finishing. Used to determine whether to chunk multiproofs. + storage_available_workers: Arc, + /// Counter tracking available account workers. Workers decrement when starting work, + /// increment when finishing. Used to determine whether to chunk multiproofs. + account_available_workers: Arc, } -impl ProofResultContext { - /// Creates a new proof result context. - pub const fn new( - sender: ProofResultSender, - sequence_number: u64, - state: HashedPostState, - start_time: Instant, - ) -> Self { - Self { sender, sequence_number, state, start_time } - } -} +impl ProofWorkerHandle { + /// Spawns storage and account worker pools with dedicated database transactions. + /// + /// Returns a handle for submitting proof tasks to the worker pools. + /// Workers run until the last handle is dropped. + /// + /// # Parameters + /// - `executor`: Tokio runtime handle for spawning blocking tasks + /// - `view`: Consistent database view for creating transactions + /// - `task_ctx`: Shared context with trie updates and prefix sets + /// - `storage_worker_count`: Number of storage workers to spawn + /// - `account_worker_count`: Number of account workers to spawn + pub fn new( + executor: Handle, + view: ConsistentDbView, + task_ctx: ProofTaskCtx, + storage_worker_count: usize, + account_worker_count: usize, + ) -> Self + where + Factory: DatabaseProviderFactory + Clone + 'static, + { + let (storage_work_tx, storage_work_rx) = unbounded::(); + let (account_work_tx, account_work_rx) = unbounded::(); -/// Internal message for storage workers. -#[derive(Debug)] -enum StorageWorkerJob { - /// Storage proof computation request - StorageProof { - /// Storage proof input parameters - input: StorageProofInput, - /// Context for sending the proof result. - proof_result_sender: ProofResultContext, - }, - /// Blinded storage node retrieval request - BlindedStorageNode { - /// Target account - account: B256, - /// Path to the storage node - path: Nibbles, - /// Channel to send result back to original caller - result_sender: Sender, - }, -} + // Initialize availability counters at zero. Each worker will increment when it + // successfully initializes, ensuring only healthy workers are counted. + let storage_available_workers = Arc::new(AtomicUsize::new(0)); + let account_available_workers = Arc::new(AtomicUsize::new(0)); -/// Worker loop for storage trie operations. -/// -/// # Lifecycle -/// -/// Each worker: -/// 1. Receives `StorageWorkerJob` from crossbeam unbounded channel -/// 2. Computes result using its dedicated long-lived transaction -/// 3. Sends result directly to original caller via `std::mpsc` -/// 4. Repeats until channel closes (graceful shutdown) -/// -/// # Transaction Reuse -/// -/// Reuses the same transaction and cursor factories across multiple operations -/// to avoid transaction creation and cursor factory setup overhead. -/// -/// # Panic Safety -/// -/// If this function panics, the worker thread terminates but other workers -/// continue operating and the system degrades gracefully. -/// -/// # Shutdown -/// -/// Worker shuts down when the crossbeam channel closes (all senders dropped). -fn storage_worker_loop( - view: ConsistentDbView, - task_ctx: ProofTaskCtx, - work_rx: CrossbeamReceiver, - worker_id: usize, - available_workers: Arc, - #[cfg(feature = "metrics")] metrics: ProofTaskTrieMetrics, -) where - Factory: DatabaseProviderFactory, -{ - // Create db transaction before entering work loop - let provider = - view.provider_ro().expect("Storage worker failed to initialize: database unavailable"); - let proof_tx = ProofTaskTx::new(provider.into_tx(), task_ctx, worker_id); + tracing::debug!( + target: "trie::proof_task", + storage_worker_count, + account_worker_count, + "Spawning proof worker pools" + ); - trace!( - target: "trie::proof_task", - worker_id, - "Storage worker started" - ); + let storage_worker_parent = + debug_span!(target: "trie::proof_task", "Storage worker tasks", ?storage_worker_count); + let _guard = storage_worker_parent.enter(); - // Create factories once at worker startup to avoid recreation overhead. - let (trie_cursor_factory, hashed_cursor_factory) = proof_tx.create_factories(); + // Spawn storage workers + for worker_id in 0..storage_worker_count { + let parent_span = debug_span!(target: "trie::proof_task", "Storage worker", ?worker_id); + let view_clone = view.clone(); + let task_ctx_clone = task_ctx.clone(); + let work_rx_clone = storage_work_rx.clone(); + let storage_available_workers_clone = storage_available_workers.clone(); - // Create blinded provider factory once for all blinded node requests - let blinded_provider_factory = ProofTrieNodeProviderFactory::new( - trie_cursor_factory.clone(), - hashed_cursor_factory.clone(), - proof_tx.task_ctx.prefix_sets.clone(), - ); + executor.spawn_blocking(move || { + #[cfg(feature = "metrics")] + let metrics = ProofTaskTrieMetrics::default(); - let mut storage_proofs_processed = 0u64; - let mut storage_nodes_processed = 0u64; + let _guard = parent_span.enter(); + storage_worker_loop( + view_clone, + task_ctx_clone, + work_rx_clone, + worker_id, + storage_available_workers_clone, + #[cfg(feature = "metrics")] + metrics, + ) + }); - // Initially mark this worker as available. - available_workers.fetch_add(1, Ordering::Relaxed); + tracing::debug!( + target: "trie::proof_task", + worker_id, + "Storage worker spawned successfully" + ); + } - while let Ok(job) = work_rx.recv() { - // Mark worker as busy. - available_workers.fetch_sub(1, Ordering::Relaxed); + drop(_guard); - match job { - StorageWorkerJob::StorageProof { input, proof_result_sender } => { - let hashed_address = input.hashed_address; - let ProofResultContext { sender, sequence_number: seq, state, start_time } = - proof_result_sender; + let account_worker_parent = + debug_span!(target: "trie::proof_task", "Account worker tasks", ?account_worker_count); + let _guard = account_worker_parent.enter(); - trace!( - target: "trie::proof_task", - worker_id, - hashed_address = ?hashed_address, - prefix_set_len = input.prefix_set.len(), - target_slots_len = input.target_slots.len(), - "Processing storage proof" - ); + // Spawn account workers + for worker_id in 0..account_worker_count { + let parent_span = debug_span!(target: "trie::proof_task", "Account worker", ?worker_id); + let view_clone = view.clone(); + let task_ctx_clone = task_ctx.clone(); + let work_rx_clone = account_work_rx.clone(); + let storage_work_tx_clone = storage_work_tx.clone(); + let account_available_workers_clone = account_available_workers.clone(); - let proof_start = Instant::now(); + executor.spawn_blocking(move || { + #[cfg(feature = "metrics")] + let metrics = ProofTaskTrieMetrics::default(); - let result = proof_tx.compute_storage_proof( - input, - trie_cursor_factory.clone(), - hashed_cursor_factory.clone(), - ); + let _guard = parent_span.enter(); + account_worker_loop( + view_clone, + task_ctx_clone, + work_rx_clone, + storage_work_tx_clone, + worker_id, + account_available_workers_clone, + #[cfg(feature = "metrics")] + metrics, + ) + }); - let proof_elapsed = proof_start.elapsed(); - storage_proofs_processed += 1; + tracing::debug!( + target: "trie::proof_task", + worker_id, + "Account worker spawned successfully" + ); + } - // Convert storage proof to account multiproof format - let result_msg = match result { - Ok(storage_proof) => { - let multiproof = reth_trie::DecodedMultiProof::from_storage_proof( - hashed_address, - storage_proof, - ); - let stats = crate::stats::ParallelTrieTracker::default().finish(); - Ok((multiproof, stats)) - } - Err(e) => Err(e), - }; + drop(_guard); - if sender - .send(ProofResultMessage { - sequence_number: seq, - result: result_msg, - elapsed: start_time.elapsed(), - state, - }) - .is_err() - { - trace!( - target: "trie::proof_task", - worker_id, - hashed_address = ?hashed_address, - storage_proofs_processed, - "Proof result receiver dropped, discarding result" - ); - } + Self::new_handle( + storage_work_tx, + account_work_tx, + storage_available_workers, + account_available_workers, + ) + } - trace!( - target: "trie::proof_task", - worker_id, - hashed_address = ?hashed_address, - proof_time_us = proof_elapsed.as_micros(), - total_processed = storage_proofs_processed, - "Storage proof completed" - ); + /// Creates a new [`ProofWorkerHandle`] with direct access to worker pools. + /// + /// This is an internal constructor used for creating handles. + const fn new_handle( + storage_work_tx: CrossbeamSender, + account_work_tx: CrossbeamSender, + storage_available_workers: Arc, + account_available_workers: Arc, + ) -> Self { + Self { + storage_work_tx, + account_work_tx, + storage_available_workers, + account_available_workers, + } + } - // Mark worker as available again. - available_workers.fetch_add(1, Ordering::Relaxed); - } + /// Returns true if there are available storage workers to process tasks. + pub fn has_available_storage_workers(&self) -> bool { + self.storage_available_workers.load(Ordering::Relaxed) > 0 + } - StorageWorkerJob::BlindedStorageNode { account, path, result_sender } => { - trace!( - target: "trie::proof_task", - worker_id, - ?account, - ?path, - "Processing blinded storage node" - ); + /// Returns true if there are available account workers to process tasks. + pub fn has_available_account_workers(&self) -> bool { + self.account_available_workers.load(Ordering::Relaxed) > 0 + } - let start = Instant::now(); - let result = - blinded_provider_factory.storage_node_provider(account).trie_node(&path); - let elapsed = start.elapsed(); + /// Returns the number of pending storage tasks in the queue. + pub fn pending_storage_tasks(&self) -> usize { + self.storage_work_tx.len() + } - storage_nodes_processed += 1; + /// Returns the number of pending account tasks in the queue. + pub fn pending_account_tasks(&self) -> usize { + self.account_work_tx.len() + } - if result_sender.send(result).is_err() { - trace!( - target: "trie::proof_task", - worker_id, - ?account, - ?path, - storage_nodes_processed, - "Blinded storage node receiver dropped, discarding result" - ); - } + /// Dispatch a storage proof computation to storage worker pool + /// + /// The result will be sent via the `proof_result_sender` channel. + pub fn dispatch_storage_proof( + &self, + input: StorageProofInput, + proof_result_sender: ProofResultContext, + ) -> Result<(), ProviderError> { + self.storage_work_tx + .send(StorageWorkerJob::StorageProof { input, proof_result_sender }) + .map_err(|err| { + let error = + ProviderError::other(std::io::Error::other("storage workers unavailable")); - trace!( - target: "trie::proof_task", - worker_id, - ?account, - ?path, - elapsed_us = elapsed.as_micros(), - total_processed = storage_nodes_processed, - "Blinded storage node completed" - ); + if let StorageWorkerJob::StorageProof { proof_result_sender, .. } = err.0 { + let ProofResultContext { + sender: result_tx, + sequence_number: seq, + state, + start_time: start, + } = proof_result_sender; - // Mark worker as available again. - available_workers.fetch_add(1, Ordering::Relaxed); - } - } + let _ = result_tx.send(ProofResultMessage { + sequence_number: seq, + result: Err(ParallelStateRootError::Provider(error.clone())), + elapsed: start.elapsed(), + state, + }); + } + + error + }) } - trace!( - target: "trie::proof_task", - worker_id, - storage_proofs_processed, - storage_nodes_processed, - "Storage worker shutting down" - ); + /// Dispatch an account multiproof computation + /// + /// The result will be sent via the `result_sender` channel included in the input. + pub fn dispatch_account_multiproof( + &self, + input: AccountMultiproofInput, + ) -> Result<(), ProviderError> { + self.account_work_tx + .send(AccountWorkerJob::AccountMultiproof { input: Box::new(input) }) + .map_err(|err| { + let error = + ProviderError::other(std::io::Error::other("account workers unavailable")); - #[cfg(feature = "metrics")] - metrics.record_storage_nodes(storage_nodes_processed as usize); -} + if let AccountWorkerJob::AccountMultiproof { input } = err.0 { + let AccountMultiproofInput { + proof_result_sender: + ProofResultContext { + sender: result_tx, + sequence_number: seq, + state, + start_time: start, + }, + .. + } = *input; -/// Worker loop for account trie operations. -/// -/// # Lifecycle -/// -/// Each worker initializes its providers, advertises availability, then loops: -/// take a job, mark busy, compute the proof, send the result, and mark available again. -/// The loop ends gracefully once the channel closes. -/// -/// # Transaction Reuse -/// -/// Reuses the same transaction and cursor factories across multiple operations -/// to avoid transaction creation and cursor factory setup overhead. -/// -/// # Panic Safety -/// -/// If this function panics, the worker thread terminates but other workers -/// continue operating and the system degrades gracefully. -/// -/// # Shutdown -/// -/// Worker shuts down when the crossbeam channel closes (all senders dropped). -fn account_worker_loop( - view: ConsistentDbView, - task_ctx: ProofTaskCtx, - work_rx: CrossbeamReceiver, - storage_work_tx: CrossbeamSender, - worker_id: usize, - available_workers: Arc, - #[cfg(feature = "metrics")] metrics: ProofTaskTrieMetrics, -) where - Factory: DatabaseProviderFactory, -{ - // Create db transaction before entering work loop - let provider = - view.provider_ro().expect("Account worker failed to initialize: database unavailable"); - let proof_tx = ProofTaskTx::new(provider.into_tx(), task_ctx, worker_id); + let _ = result_tx.send(ProofResultMessage { + sequence_number: seq, + result: Err(ParallelStateRootError::Provider(error.clone())), + elapsed: start.elapsed(), + state, + }); + } - trace!( - target: "trie::proof_task", - worker_id, - "Account worker started" - ); + error + }) + } - // Create factories once at worker startup to avoid recreation overhead. - let (trie_cursor_factory, hashed_cursor_factory) = proof_tx.create_factories(); + /// Dispatch blinded storage node request to storage worker pool + pub(crate) fn dispatch_blinded_storage_node( + &self, + account: B256, + path: Nibbles, + ) -> Result, ProviderError> { + let (tx, rx) = channel(); + self.storage_work_tx + .send(StorageWorkerJob::BlindedStorageNode { account, path, result_sender: tx }) + .map_err(|_| { + ProviderError::other(std::io::Error::other("storage workers unavailable")) + })?; - // Create blinded provider factory once for all blinded node requests - let blinded_provider_factory = ProofTrieNodeProviderFactory::new( - trie_cursor_factory.clone(), - hashed_cursor_factory.clone(), - proof_tx.task_ctx.prefix_sets.clone(), - ); + Ok(rx) + } - let mut account_proofs_processed = 0u64; - let mut account_nodes_processed = 0u64; + /// Dispatch blinded account node request to account worker pool + pub(crate) fn dispatch_blinded_account_node( + &self, + path: Nibbles, + ) -> Result, ProviderError> { + let (tx, rx) = channel(); + self.account_work_tx + .send(AccountWorkerJob::BlindedAccountNode { path, result_sender: tx }) + .map_err(|_| { + ProviderError::other(std::io::Error::other("account workers unavailable")) + })?; - // Count this worker as available only after successful initialization. - available_workers.fetch_add(1, Ordering::Relaxed); + Ok(rx) + } +} - while let Ok(job) = work_rx.recv() { - // Mark worker as busy. - available_workers.fetch_sub(1, Ordering::Relaxed); +/// Data used for initializing cursor factories that is shared across all storage proof instances. +#[derive(Debug, Clone)] +pub struct ProofTaskCtx { + /// The sorted collection of cached in-memory intermediate trie nodes that can be reused for + /// computation. + nodes_sorted: Arc, + /// The sorted in-memory overlay hashed state. + state_sorted: Arc, + /// The collection of prefix sets for the computation. Since the prefix sets _always_ + /// invalidate the in-memory nodes, not all keys from `state_sorted` might be present here, + /// if we have cached nodes for them. + prefix_sets: Arc, +} - match job { - AccountWorkerJob::AccountMultiproof { input } => { - let AccountMultiproofInput { - targets, - mut prefix_sets, - collect_branch_node_masks, - multi_added_removed_keys, - missed_leaves_storage_roots, - proof_result_sender: - ProofResultContext { - sender: result_tx, - sequence_number: seq, - state, - start_time: start, - }, - } = *input; - - let span = debug_span!( - target: "trie::proof_task", - "Account multiproof calculation", - targets = targets.len(), - worker_id, - ); - let _span_guard = span.enter(); - - trace!( - target: "trie::proof_task", - "Processing account multiproof" - ); - - let proof_start = Instant::now(); - - let mut tracker = ParallelTrieTracker::default(); - - let mut storage_prefix_sets = std::mem::take(&mut prefix_sets.storage_prefix_sets); - - let storage_root_targets_len = StorageRootTargets::count( - &prefix_sets.account_prefix_set, - &storage_prefix_sets, - ); +impl ProofTaskCtx { + /// Creates a new [`ProofTaskCtx`] with the given sorted nodes and state. + pub const fn new( + nodes_sorted: Arc, + state_sorted: Arc, + prefix_sets: Arc, + ) -> Self { + Self { nodes_sorted, state_sorted, prefix_sets } + } +} - tracker.set_precomputed_storage_roots(storage_root_targets_len as u64); +/// Type alias for the factory tuple returned by [`ProofTaskTx::create_factories`]. +type ProofFactories<'a, Tx> = ( + InMemoryTrieCursorFactory, &'a TrieUpdatesSorted>, + HashedPostStateCursorFactory, &'a HashedPostStateSorted>, +); - let storage_proof_receivers = match dispatch_storage_proofs( - &storage_work_tx, - &targets, - &mut storage_prefix_sets, - collect_branch_node_masks, - multi_added_removed_keys.as_ref(), - ) { - Ok(receivers) => receivers, - Err(error) => { - // Send error through result channel - error!(target: "trie::proof_task", "Failed to dispatch storage proofs: {error}"); - let _ = result_tx.send(ProofResultMessage { - sequence_number: seq, - result: Err(error), - elapsed: start.elapsed(), - state, - }); - continue; - } - }; +/// This contains all information shared between all storage proof instances. +#[derive(Debug)] +pub struct ProofTaskTx { + /// The tx that is reused for proof calculations. + tx: Tx, + /// Trie updates, prefix sets, and state updates + task_ctx: ProofTaskCtx, + /// Identifier for the worker within the worker pool, used only for tracing. + id: usize, +} - // Use the missed leaves cache passed from the multiproof manager - let account_prefix_set = std::mem::take(&mut prefix_sets.account_prefix_set); +impl ProofTaskTx { + /// Initializes a [`ProofTaskTx`] using the given transaction and a [`ProofTaskCtx`]. The id is + /// used only for tracing. + const fn new(tx: Tx, task_ctx: ProofTaskCtx, id: usize) -> Self { + Self { tx, task_ctx, id } + } +} - let ctx = AccountMultiproofParams { - targets: &targets, - prefix_set: account_prefix_set, - collect_branch_node_masks, - multi_added_removed_keys: multi_added_removed_keys.as_ref(), - storage_proof_receivers, - missed_leaves_storage_roots: missed_leaves_storage_roots.as_ref(), - }; +impl ProofTaskTx +where + Tx: DbTx, +{ + #[inline] + fn create_factories(&self) -> ProofFactories<'_, Tx> { + let trie_cursor_factory = InMemoryTrieCursorFactory::new( + DatabaseTrieCursorFactory::new(&self.tx), + self.task_ctx.nodes_sorted.as_ref(), + ); - let result = build_account_multiproof_with_storage_roots( - trie_cursor_factory.clone(), - hashed_cursor_factory.clone(), - ctx, - &mut tracker, - ); + let hashed_cursor_factory = HashedPostStateCursorFactory::new( + DatabaseHashedCursorFactory::new(&self.tx), + self.task_ctx.state_sorted.as_ref(), + ); - let proof_elapsed = proof_start.elapsed(); - let total_elapsed = start.elapsed(); - let stats = tracker.finish(); - let result = result.map(|proof| (proof, stats)); - account_proofs_processed += 1; + (trie_cursor_factory, hashed_cursor_factory) + } - // Send result to MultiProofTask - if result_tx - .send(ProofResultMessage { - sequence_number: seq, - result, - elapsed: total_elapsed, - state, - }) - .is_err() - { - trace!( - target: "trie::proof_task", - worker_id, - account_proofs_processed, - "Account multiproof receiver dropped, discarding result" - ); - } + /// Compute storage proof with pre-created factories. + /// + /// Accepts cursor factories as parameters to allow reuse across multiple proofs. + /// Used by storage workers in the worker pool to avoid factory recreation + /// overhead on each proof computation. + #[inline] + fn compute_storage_proof( + &self, + input: StorageProofInput, + trie_cursor_factory: impl TrieCursorFactory, + hashed_cursor_factory: impl HashedCursorFactory, + ) -> StorageProofResult { + // Consume the input so we can move large collections (e.g. target slots) without cloning. + let StorageProofInput { + hashed_address, + prefix_set, + target_slots, + with_branch_node_masks, + multi_added_removed_keys, + } = input; - trace!( - target: "trie::proof_task", - proof_time_us = proof_elapsed.as_micros(), - total_elapsed_us = total_elapsed.as_micros(), - total_processed = account_proofs_processed, - "Account multiproof completed" - ); - drop(_span_guard); + // Get or create added/removed keys context + let multi_added_removed_keys = + multi_added_removed_keys.unwrap_or_else(|| Arc::new(MultiAddedRemovedKeys::new())); + let added_removed_keys = multi_added_removed_keys.get_storage(&hashed_address); - // Mark worker as available again. - available_workers.fetch_add(1, Ordering::Relaxed); - } + let span = tracing::debug_span!( + target: "trie::proof_task", + "Storage proof calculation", + hashed_address = ?hashed_address, + worker_id = self.id, + ); + let _span_guard = span.enter(); - AccountWorkerJob::BlindedAccountNode { path, result_sender } => { - let span = debug_span!( - target: "trie::proof_task", - "Blinded account node calculation", - ?path, - worker_id, - ); - let _span_guard = span.enter(); + let proof_start = Instant::now(); - trace!( - target: "trie::proof_task", - "Processing blinded account node" - ); + // Compute raw storage multiproof + let raw_proof_result = + StorageProof::new_hashed(trie_cursor_factory, hashed_cursor_factory, hashed_address) + .with_prefix_set_mut(PrefixSetMut::from(prefix_set.iter().copied())) + .with_branch_node_masks(with_branch_node_masks) + .with_added_removed_keys(added_removed_keys) + .storage_multiproof(target_slots) + .map_err(|e| ParallelStateRootError::Other(e.to_string())); - let start = Instant::now(); - let result = blinded_provider_factory.account_node_provider().trie_node(&path); - let elapsed = start.elapsed(); + // Decode proof into DecodedStorageMultiProof + let decoded_result = raw_proof_result.and_then(|raw_proof| { + raw_proof.try_into().map_err(|e: alloy_rlp::Error| { + ParallelStateRootError::Other(format!( + "Failed to decode storage proof for {}: {}", + hashed_address, e + )) + }) + }); - account_nodes_processed += 1; + trace!( + target: "trie::proof_task", + hashed_address = ?hashed_address, + proof_time_us = proof_start.elapsed().as_micros(), + worker_id = self.id, + "Completed storage proof calculation" + ); - if result_sender.send(result).is_err() { - trace!( - target: "trie::proof_task", - worker_id, - ?path, - account_nodes_processed, - "Blinded account node receiver dropped, discarding result" - ); - } + decoded_result + } +} - trace!( - target: "trie::proof_task", - node_time_us = elapsed.as_micros(), - total_processed = account_nodes_processed, - "Blinded account node completed" - ); - drop(_span_guard); +impl TrieNodeProviderFactory for ProofWorkerHandle { + type AccountNodeProvider = ProofTaskTrieNodeProvider; + type StorageNodeProvider = ProofTaskTrieNodeProvider; - // Mark worker as available again. - available_workers.fetch_add(1, Ordering::Relaxed); - } - } + fn account_node_provider(&self) -> Self::AccountNodeProvider { + ProofTaskTrieNodeProvider::AccountNode { handle: self.clone() } } - trace!( - target: "trie::proof_task", - worker_id, - account_proofs_processed, - account_nodes_processed, - "Account worker shutting down" - ); - - #[cfg(feature = "metrics")] - metrics.record_account_nodes(account_nodes_processed as usize); + fn storage_node_provider(&self, account: B256) -> Self::StorageNodeProvider { + ProofTaskTrieNodeProvider::StorageNode { account, handle: self.clone() } + } } -/// Builds an account multiproof by consuming storage proof receivers lazily during trie walk. -/// -/// This is a helper function used by account workers to build the account subtree proof -/// while storage proofs are still being computed. Receivers are consumed only when needed, -/// enabling interleaved parallelism between account trie traversal and storage proof computation. +/// Trie node provider for retrieving trie nodes by path. +#[derive(Debug)] +pub enum ProofTaskTrieNodeProvider { + /// Blinded account trie node provider. + AccountNode { + /// Handle to the proof worker pools. + handle: ProofWorkerHandle, + }, + /// Blinded storage trie node provider. + StorageNode { + /// Target account. + account: B256, + /// Handle to the proof worker pools. + handle: ProofWorkerHandle, + }, +} + +impl TrieNodeProvider for ProofTaskTrieNodeProvider { + fn trie_node(&self, path: &Nibbles) -> Result, SparseTrieError> { + match self { + Self::AccountNode { handle } => { + let rx = handle + .dispatch_blinded_account_node(*path) + .map_err(|error| SparseTrieErrorKind::Other(Box::new(error)))?; + rx.recv().map_err(|error| SparseTrieErrorKind::Other(Box::new(error)))? + } + Self::StorageNode { handle, account } => { + let rx = handle + .dispatch_blinded_storage_node(*account, *path) + .map_err(|error| SparseTrieErrorKind::Other(Box::new(error)))?; + rx.recv().map_err(|error| SparseTrieErrorKind::Other(Box::new(error)))? + } + } + } +} + +/// Result of a proof calculation, which can be either an account multiproof or a storage proof. +#[derive(Debug)] +pub enum ProofResult { + /// Account multiproof with statistics + AccountMultiproof(DecodedMultiProof, ParallelTrieStats), + /// Storage proof for a specific account + StorageProof { + /// The hashed address this storage proof belongs to + hashed_address: B256, + /// The storage multiproof + proof: DecodedStorageMultiProof, + }, +} + +/// Channel used by worker threads to deliver `ProofResultMessage` items back to +/// `MultiProofTask`. /// -/// Returns a `DecodedMultiProof` containing the account subtree and storage proofs. -fn build_account_multiproof_with_storage_roots( - trie_cursor_factory: C, - hashed_cursor_factory: H, - ctx: AccountMultiproofParams<'_>, - tracker: &mut ParallelTrieTracker, -) -> Result -where - C: TrieCursorFactory + Clone, - H: HashedCursorFactory + Clone, -{ - let accounts_added_removed_keys = - ctx.multi_added_removed_keys.as_ref().map(|keys| keys.get_accounts()); +/// Workers use this sender to deliver proof results directly to `MultiProofTask`. +pub type ProofResultSender = CrossbeamSender; - // Create the walker. - let walker = TrieWalker::<_>::state_trie( - trie_cursor_factory.account_trie_cursor().map_err(ProviderError::Database)?, - ctx.prefix_set, - ) - .with_added_removed_keys(accounts_added_removed_keys) - .with_deletions_retained(true); +/// Message containing a completed proof result with metadata for direct delivery to +/// `MultiProofTask`. +/// +/// This type enables workers to send proof results directly to the `MultiProofTask` event loop. +#[derive(Debug)] +pub struct ProofResultMessage { + /// Sequence number for ordering proofs + pub sequence_number: u64, + /// The proof calculation result (either account multiproof or storage proof) + pub result: Result, + /// Time taken for the entire proof calculation (from dispatch to completion) + pub elapsed: Duration, + /// Original state update that triggered this proof + pub state: HashedPostState, +} - // Create a hash builder to rebuild the root node since it is not available in the database. - let retainer = ctx - .targets - .keys() - .map(Nibbles::unpack) - .collect::() - .with_added_removed_keys(accounts_added_removed_keys); - let mut hash_builder = HashBuilder::default() - .with_proof_retainer(retainer) - .with_updates(ctx.collect_branch_node_masks); +/// Context for sending proof calculation results back to `MultiProofTask`. +/// +/// This struct contains all context needed to send and track proof calculation results. +/// Workers use this to deliver completed proofs back to the main event loop. +#[derive(Debug, Clone)] +pub struct ProofResultContext { + /// Channel sender for result delivery + pub sender: ProofResultSender, + /// Sequence number for proof ordering + pub sequence_number: u64, + /// Original state update that triggered this proof + pub state: HashedPostState, + /// Calculation start time for measuring elapsed duration + pub start_time: Instant, +} - // Initialize storage multiproofs map with pre-allocated capacity. - // Proofs will be inserted as they're consumed from receivers during trie walk. - let mut collected_decoded_storages: B256Map = - B256Map::with_capacity_and_hasher(ctx.targets.len(), Default::default()); - let mut account_rlp = Vec::with_capacity(TRIE_ACCOUNT_RLP_MAX_SIZE); - let mut account_node_iter = TrieNodeIter::state_trie( - walker, - hashed_cursor_factory.hashed_account_cursor().map_err(ProviderError::Database)?, +impl ProofResultContext { + /// Creates a new proof result context. + pub const fn new( + sender: ProofResultSender, + sequence_number: u64, + state: HashedPostState, + start_time: Instant, + ) -> Self { + Self { sender, sequence_number, state, start_time } + } +} + +/// Internal message for storage workers. +#[derive(Debug)] +enum StorageWorkerJob { + /// Storage proof computation request + StorageProof { + /// Storage proof input parameters + input: StorageProofInput, + /// Context for sending the proof result. + proof_result_sender: ProofResultContext, + }, + /// Blinded storage node retrieval request + BlindedStorageNode { + /// Target account + account: B256, + /// Path to the storage node + path: Nibbles, + /// Channel to send result back to original caller + result_sender: Sender, + }, +} + +/// Worker loop for storage trie operations. +/// +/// # Lifecycle +/// +/// Each worker: +/// 1. Receives `StorageWorkerJob` from crossbeam unbounded channel +/// 2. Computes result using its dedicated long-lived transaction +/// 3. Sends result directly to original caller via `std::mpsc` +/// 4. Repeats until channel closes (graceful shutdown) +/// +/// # Transaction Reuse +/// +/// Reuses the same transaction and cursor factories across multiple operations +/// to avoid transaction creation and cursor factory setup overhead. +/// +/// # Panic Safety +/// +/// If this function panics, the worker thread terminates but other workers +/// continue operating and the system degrades gracefully. +/// +/// # Shutdown +/// +/// Worker shuts down when the crossbeam channel closes (all senders dropped). +fn storage_worker_loop( + view: ConsistentDbView, + task_ctx: ProofTaskCtx, + work_rx: CrossbeamReceiver, + worker_id: usize, + available_workers: Arc, + #[cfg(feature = "metrics")] metrics: ProofTaskTrieMetrics, +) where + Factory: DatabaseProviderFactory, +{ + // Create db transaction before entering work loop + let provider = + view.provider_ro().expect("Storage worker failed to initialize: database unavailable"); + let proof_tx = ProofTaskTx::new(provider.into_tx(), task_ctx, worker_id); + + trace!( + target: "trie::proof_task", + worker_id, + "Storage worker started" ); - let mut storage_proof_receivers = ctx.storage_proof_receivers; + // Create factories once at worker startup to avoid recreation overhead. + let (trie_cursor_factory, hashed_cursor_factory) = proof_tx.create_factories(); - while let Some(account_node) = account_node_iter.try_next().map_err(ProviderError::Database)? { - match account_node { - TrieElement::Branch(node) => { - hash_builder.add_branch(node.key, node.value, node.children_are_in_trie); - } - TrieElement::Leaf(hashed_address, account) => { - let root = match storage_proof_receivers.remove(&hashed_address) { - Some(receiver) => { - // Block on this specific storage proof receiver - enables interleaved - // parallelism - let proof_msg = receiver.recv().map_err(|_| { - ParallelStateRootError::StorageRoot( - reth_execution_errors::StorageRootError::Database( - DatabaseError::Other(format!( - "Storage proof channel closed for {hashed_address}" - )), - ), - ) - })?; + // Create blinded provider factory once for all blinded node requests + let blinded_provider_factory = ProofTrieNodeProviderFactory::new( + trie_cursor_factory.clone(), + hashed_cursor_factory.clone(), + proof_tx.task_ctx.prefix_sets.clone(), + ); - // Extract storage proof from the multiproof wrapper - let (mut multiproof, _stats) = proof_msg.result?; - let proof = - multiproof.storages.remove(&hashed_address).ok_or_else(|| { - ParallelStateRootError::Other(format!( - "storage proof not found in multiproof for {hashed_address}" - )) - })?; + let mut storage_proofs_processed = 0u64; + let mut storage_nodes_processed = 0u64; - let root = proof.root; - collected_decoded_storages.insert(hashed_address, proof); - root - } - // Since we do not store all intermediate nodes in the database, there might - // be a possibility of re-adding a non-modified leaf to the hash builder. - None => { - tracker.inc_missed_leaves(); + // Initially mark this worker as available. + available_workers.fetch_add(1, Ordering::Relaxed); - match ctx.missed_leaves_storage_roots.entry(hashed_address) { - dashmap::Entry::Occupied(occ) => *occ.get(), - dashmap::Entry::Vacant(vac) => { - let root = StorageProof::new_hashed( - trie_cursor_factory.clone(), - hashed_cursor_factory.clone(), - hashed_address, - ) - .with_prefix_set_mut(Default::default()) - .storage_multiproof( - ctx.targets.get(&hashed_address).cloned().unwrap_or_default(), - ) - .map_err(|e| { - ParallelStateRootError::StorageRoot( - reth_execution_errors::StorageRootError::Database( - DatabaseError::Other(e.to_string()), - ), - ) - })? - .root; + while let Ok(job) = work_rx.recv() { + // Mark worker as busy. + available_workers.fetch_sub(1, Ordering::Relaxed); - vac.insert(root); - root - } - } - } - }; + match job { + StorageWorkerJob::StorageProof { input, proof_result_sender } => { + let hashed_address = input.hashed_address; + let ProofResultContext { sender, sequence_number: seq, state, start_time } = + proof_result_sender; - // Encode account - account_rlp.clear(); - let account = account.into_trie_account(root); - account.encode(&mut account_rlp as &mut dyn BufMut); + trace!( + target: "trie::proof_task", + worker_id, + hashed_address = ?hashed_address, + prefix_set_len = input.prefix_set.len(), + target_slots_len = input.target_slots.len(), + "Processing storage proof" + ); - hash_builder.add_leaf(Nibbles::unpack(hashed_address), &account_rlp); - } - } - } + let proof_start = Instant::now(); - // Consume remaining storage proof receivers for accounts not encountered during trie walk. - for (hashed_address, receiver) in storage_proof_receivers { - if let Ok(proof_msg) = receiver.recv() { - // Extract storage proof from the multiproof wrapper - if let Ok((mut multiproof, _stats)) = proof_msg.result && - let Some(proof) = multiproof.storages.remove(&hashed_address) - { - collected_decoded_storages.insert(hashed_address, proof); - } - } - } + let result = proof_tx.compute_storage_proof( + input, + trie_cursor_factory.clone(), + hashed_cursor_factory.clone(), + ); - let _ = hash_builder.root(); + let proof_elapsed = proof_start.elapsed(); + storage_proofs_processed += 1; - let account_subtree_raw_nodes = hash_builder.take_proof_nodes(); - let decoded_account_subtree = DecodedProofNodes::try_from(account_subtree_raw_nodes)?; + let result_msg = result.map(|storage_proof| ProofResult::StorageProof { + hashed_address, + proof: storage_proof, + }); - let (branch_node_hash_masks, branch_node_tree_masks) = if ctx.collect_branch_node_masks { - let updated_branch_nodes = hash_builder.updated_branch_nodes.unwrap_or_default(); - ( - updated_branch_nodes.iter().map(|(path, node)| (*path, node.hash_mask)).collect(), - updated_branch_nodes.into_iter().map(|(path, node)| (path, node.tree_mask)).collect(), - ) - } else { - (Default::default(), Default::default()) - }; + if sender + .send(ProofResultMessage { + sequence_number: seq, + result: result_msg, + elapsed: start_time.elapsed(), + state, + }) + .is_err() + { + trace!( + target: "trie::proof_task", + worker_id, + hashed_address = ?hashed_address, + storage_proofs_processed, + "Proof result receiver dropped, discarding result" + ); + } + + trace!( + target: "trie::proof_task", + worker_id, + hashed_address = ?hashed_address, + proof_time_us = proof_elapsed.as_micros(), + total_processed = storage_proofs_processed, + "Storage proof completed" + ); - Ok(DecodedMultiProof { - account_subtree: decoded_account_subtree, - branch_node_hash_masks, - branch_node_tree_masks, - storages: collected_decoded_storages, - }) -} + // Mark worker as available again. + available_workers.fetch_add(1, Ordering::Relaxed); + } -/// Queues storage proofs for all accounts in the targets and returns receivers. -/// -/// This function queues all storage proof tasks to the worker pool but returns immediately -/// with receivers, allowing the account trie walk to proceed in parallel with storage proof -/// computation. This enables interleaved parallelism for better performance. -/// -/// Propagates errors up if queuing fails. Receivers must be consumed by the caller. -fn dispatch_storage_proofs( - storage_work_tx: &CrossbeamSender, - targets: &MultiProofTargets, - storage_prefix_sets: &mut B256Map, - with_branch_node_masks: bool, - multi_added_removed_keys: Option<&Arc>, -) -> Result>, ParallelStateRootError> { - let mut storage_proof_receivers = - B256Map::with_capacity_and_hasher(targets.len(), Default::default()); + StorageWorkerJob::BlindedStorageNode { account, path, result_sender } => { + trace!( + target: "trie::proof_task", + worker_id, + ?account, + ?path, + "Processing blinded storage node" + ); - // Dispatch all storage proofs to worker pool - for (hashed_address, target_slots) in targets.iter() { - let prefix_set = storage_prefix_sets.remove(hashed_address).unwrap_or_default(); + let start = Instant::now(); + let result = + blinded_provider_factory.storage_node_provider(account).trie_node(&path); + let elapsed = start.elapsed(); - // Create channel for receiving ProofResultMessage - let (result_tx, result_rx) = crossbeam_channel::unbounded(); - let start = Instant::now(); + storage_nodes_processed += 1; - // Create computation input (data only, no communication channel) - let input = StorageProofInput::new( - *hashed_address, - prefix_set, - target_slots.clone(), - with_branch_node_masks, - multi_added_removed_keys.cloned(), - ); + if result_sender.send(result).is_err() { + trace!( + target: "trie::proof_task", + worker_id, + ?account, + ?path, + storage_nodes_processed, + "Blinded storage node receiver dropped, discarding result" + ); + } - // Always dispatch a storage proof so we obtain the storage root even when no slots are - // requested. - storage_work_tx - .send(StorageWorkerJob::StorageProof { - input, - proof_result_sender: ProofResultContext::new( - result_tx, - 0, - HashedPostState::default(), - start, - ), - }) - .map_err(|_| { - ParallelStateRootError::Other(format!( - "Failed to queue storage proof for {}: storage worker pool unavailable", - hashed_address - )) - })?; + trace!( + target: "trie::proof_task", + worker_id, + ?account, + ?path, + elapsed_us = elapsed.as_micros(), + total_processed = storage_nodes_processed, + "Blinded storage node completed" + ); - storage_proof_receivers.insert(*hashed_address, result_rx); + // Mark worker as available again. + available_workers.fetch_add(1, Ordering::Relaxed); + } + } } - Ok(storage_proof_receivers) -} - -/// Type alias for the factory tuple returned by `create_factories` -type ProofFactories<'a, Tx> = ( - InMemoryTrieCursorFactory, &'a TrieUpdatesSorted>, - HashedPostStateCursorFactory, &'a HashedPostStateSorted>, -); + trace!( + target: "trie::proof_task", + worker_id, + storage_proofs_processed, + storage_nodes_processed, + "Storage worker shutting down" + ); -/// This contains all information shared between all storage proof instances. -#[derive(Debug)] -pub struct ProofTaskTx { - /// The tx that is reused for proof calculations. - tx: Tx, + #[cfg(feature = "metrics")] + metrics.record_storage_nodes(storage_nodes_processed as usize); +} - /// Trie updates, prefix sets, and state updates +/// Worker loop for account trie operations. +/// +/// # Lifecycle +/// +/// Each worker initializes its providers, advertises availability, then loops: +/// take a job, mark busy, compute the proof, send the result, and mark available again. +/// The loop ends gracefully once the channel closes. +/// +/// # Transaction Reuse +/// +/// Reuses the same transaction and cursor factories across multiple operations +/// to avoid transaction creation and cursor factory setup overhead. +/// +/// # Panic Safety +/// +/// If this function panics, the worker thread terminates but other workers +/// continue operating and the system degrades gracefully. +/// +/// # Shutdown +/// +/// Worker shuts down when the crossbeam channel closes (all senders dropped). +fn account_worker_loop( + view: ConsistentDbView, task_ctx: ProofTaskCtx, + work_rx: CrossbeamReceiver, + storage_work_tx: CrossbeamSender, + worker_id: usize, + available_workers: Arc, + #[cfg(feature = "metrics")] metrics: ProofTaskTrieMetrics, +) where + Factory: DatabaseProviderFactory, +{ + // Create db transaction before entering work loop + let provider = + view.provider_ro().expect("Account worker failed to initialize: database unavailable"); + let proof_tx = ProofTaskTx::new(provider.into_tx(), task_ctx, worker_id); - /// Identifier for the worker within the worker pool, used only for tracing. - id: usize, -} + trace!( + target: "trie::proof_task", + worker_id, + "Account worker started" + ); -impl ProofTaskTx { - /// Initializes a [`ProofTaskTx`] using the given transaction and a [`ProofTaskCtx`]. The id is - /// used only for tracing. - const fn new(tx: Tx, task_ctx: ProofTaskCtx, id: usize) -> Self { - Self { tx, task_ctx, id } - } -} + // Create factories once at worker startup to avoid recreation overhead. + let (trie_cursor_factory, hashed_cursor_factory) = proof_tx.create_factories(); -impl ProofTaskTx -where - Tx: DbTx, -{ - #[inline] - fn create_factories(&self) -> ProofFactories<'_, Tx> { - let trie_cursor_factory = InMemoryTrieCursorFactory::new( - DatabaseTrieCursorFactory::new(&self.tx), - self.task_ctx.nodes_sorted.as_ref(), - ); + // Create blinded provider factory once for all blinded node requests + let blinded_provider_factory = ProofTrieNodeProviderFactory::new( + trie_cursor_factory.clone(), + hashed_cursor_factory.clone(), + proof_tx.task_ctx.prefix_sets.clone(), + ); - let hashed_cursor_factory = HashedPostStateCursorFactory::new( - DatabaseHashedCursorFactory::new(&self.tx), - self.task_ctx.state_sorted.as_ref(), - ); + let mut account_proofs_processed = 0u64; + let mut account_nodes_processed = 0u64; - (trie_cursor_factory, hashed_cursor_factory) - } + // Count this worker as available only after successful initialization. + available_workers.fetch_add(1, Ordering::Relaxed); - /// Compute storage proof with pre-created factories. - /// - /// Accepts cursor factories as parameters to allow reuse across multiple proofs. - /// Used by storage workers in the worker pool to avoid factory recreation - /// overhead on each proof computation. - #[inline] - fn compute_storage_proof( - &self, - input: StorageProofInput, - trie_cursor_factory: impl TrieCursorFactory, - hashed_cursor_factory: impl HashedCursorFactory, - ) -> StorageProofResult { - // Consume the input so we can move large collections (e.g. target slots) without cloning. - let StorageProofInput { - hashed_address, - prefix_set, - target_slots, - with_branch_node_masks, - multi_added_removed_keys, - } = input; + while let Ok(job) = work_rx.recv() { + // Mark worker as busy. + available_workers.fetch_sub(1, Ordering::Relaxed); - // Get or create added/removed keys context - let multi_added_removed_keys = - multi_added_removed_keys.unwrap_or_else(|| Arc::new(MultiAddedRemovedKeys::new())); - let added_removed_keys = multi_added_removed_keys.get_storage(&hashed_address); + match job { + AccountWorkerJob::AccountMultiproof { input } => { + let AccountMultiproofInput { + targets, + mut prefix_sets, + collect_branch_node_masks, + multi_added_removed_keys, + missed_leaves_storage_roots, + proof_result_sender: + ProofResultContext { + sender: result_tx, + sequence_number: seq, + state, + start_time: start, + }, + } = *input; - let span = debug_span!( - target: "trie::proof_task", - "Storage proof calculation", - hashed_address = ?hashed_address, - worker_id = self.id, - ); - let _span_guard = span.enter(); + let span = debug_span!( + target: "trie::proof_task", + "Account multiproof calculation", + targets = targets.len(), + worker_id, + ); + let _span_guard = span.enter(); + + trace!( + target: "trie::proof_task", + "Processing account multiproof" + ); - let proof_start = Instant::now(); + let proof_start = Instant::now(); - // Compute raw storage multiproof - let raw_proof_result = - StorageProof::new_hashed(trie_cursor_factory, hashed_cursor_factory, hashed_address) - .with_prefix_set_mut(PrefixSetMut::from(prefix_set.iter().copied())) - .with_branch_node_masks(with_branch_node_masks) - .with_added_removed_keys(added_removed_keys) - .storage_multiproof(target_slots) - .map_err(|e| ParallelStateRootError::Other(e.to_string())); + let mut tracker = ParallelTrieTracker::default(); - // Decode proof into DecodedStorageMultiProof - let decoded_result = raw_proof_result.and_then(|raw_proof| { - raw_proof.try_into().map_err(|e: alloy_rlp::Error| { - ParallelStateRootError::Other(format!( - "Failed to decode storage proof for {}: {}", - hashed_address, e - )) - }) - }); + let mut storage_prefix_sets = std::mem::take(&mut prefix_sets.storage_prefix_sets); - trace!( - target: "trie::proof_task", - hashed_address = ?hashed_address, - proof_time_us = proof_start.elapsed().as_micros(), - worker_id = self.id, - "Completed storage proof calculation" - ); + let storage_root_targets_len = StorageRootTargets::count( + &prefix_sets.account_prefix_set, + &storage_prefix_sets, + ); - decoded_result - } -} + tracker.set_precomputed_storage_roots(storage_root_targets_len as u64); -/// Input parameters for storage proof computation. -#[derive(Debug)] -pub struct StorageProofInput { - /// The hashed address for which the proof is calculated. - hashed_address: B256, - /// The prefix set for the proof calculation. - prefix_set: PrefixSet, - /// The target slots for the proof calculation. - target_slots: B256Set, - /// Whether or not to collect branch node masks - with_branch_node_masks: bool, - /// Provided by the user to give the necessary context to retain extra proofs. - multi_added_removed_keys: Option>, -} + let storage_proof_receivers = match dispatch_storage_proofs( + &storage_work_tx, + &targets, + &mut storage_prefix_sets, + collect_branch_node_masks, + multi_added_removed_keys.as_ref(), + ) { + Ok(receivers) => receivers, + Err(error) => { + // Send error through result channel + error!(target: "trie::proof_task", "Failed to dispatch storage proofs: {error}"); + let _ = result_tx.send(ProofResultMessage { + sequence_number: seq, + result: Err(error), + elapsed: start.elapsed(), + state, + }); + continue; + } + }; -impl StorageProofInput { - /// Creates a new [`StorageProofInput`] with the given hashed address, prefix set, and target - /// slots. - pub const fn new( - hashed_address: B256, - prefix_set: PrefixSet, - target_slots: B256Set, - with_branch_node_masks: bool, - multi_added_removed_keys: Option>, - ) -> Self { - Self { - hashed_address, - prefix_set, - target_slots, - with_branch_node_masks, - multi_added_removed_keys, - } - } -} + // Use the missed leaves cache passed from the multiproof manager + let account_prefix_set = std::mem::take(&mut prefix_sets.account_prefix_set); -/// Input parameters for account multiproof computation. -#[derive(Debug, Clone)] -pub struct AccountMultiproofInput { - /// The targets for which to compute the multiproof. - pub targets: MultiProofTargets, - /// The prefix sets for the proof calculation. - pub prefix_sets: TriePrefixSets, - /// Whether or not to collect branch node masks. - pub collect_branch_node_masks: bool, - /// Provided by the user to give the necessary context to retain extra proofs. - pub multi_added_removed_keys: Option>, - /// Cached storage proof roots for missed leaves encountered during account trie walk. - pub missed_leaves_storage_roots: Arc>, - /// Context for sending the proof result. - pub proof_result_sender: ProofResultContext, -} + let ctx = AccountMultiproofParams { + targets: &targets, + prefix_set: account_prefix_set, + collect_branch_node_masks, + multi_added_removed_keys: multi_added_removed_keys.as_ref(), + storage_proof_receivers, + missed_leaves_storage_roots: missed_leaves_storage_roots.as_ref(), + }; -/// Parameters for building an account multiproof with pre-computed storage roots. -struct AccountMultiproofParams<'a> { - /// The targets for which to compute the multiproof. - targets: &'a MultiProofTargets, - /// The prefix set for the account trie walk. - prefix_set: PrefixSet, - /// Whether or not to collect branch node masks. - collect_branch_node_masks: bool, - /// Provided by the user to give the necessary context to retain extra proofs. - multi_added_removed_keys: Option<&'a Arc>, - /// Receivers for storage proofs being computed in parallel. - storage_proof_receivers: B256Map>, - /// Cached storage proof roots for missed leaves encountered during account trie walk. - missed_leaves_storage_roots: &'a DashMap, -} + let result = build_account_multiproof_with_storage_roots( + trie_cursor_factory.clone(), + hashed_cursor_factory.clone(), + ctx, + &mut tracker, + ); -/// Internal message for account workers. -#[derive(Debug)] -enum AccountWorkerJob { - /// Account multiproof computation request - AccountMultiproof { - /// Account multiproof input parameters - input: Box, - }, - /// Blinded account node retrieval request - BlindedAccountNode { - /// Path to the account node - path: Nibbles, - /// Channel to send result back to original caller - result_sender: Sender, - }, -} + let proof_elapsed = proof_start.elapsed(); + let total_elapsed = start.elapsed(); + let stats = tracker.finish(); + let result = result.map(|proof| ProofResult::AccountMultiproof(proof, stats)); + account_proofs_processed += 1; -/// Data used for initializing cursor factories that is shared across all storage proof instances. -#[derive(Debug, Clone)] -pub struct ProofTaskCtx { - /// The sorted collection of cached in-memory intermediate trie nodes that can be reused for - /// computation. - nodes_sorted: Arc, - /// The sorted in-memory overlay hashed state. - state_sorted: Arc, - /// The collection of prefix sets for the computation. Since the prefix sets _always_ - /// invalidate the in-memory nodes, not all keys from `state_sorted` might be present here, - /// if we have cached nodes for them. - prefix_sets: Arc, -} + // Send result to MultiProofTask + if result_tx + .send(ProofResultMessage { + sequence_number: seq, + result, + elapsed: total_elapsed, + state, + }) + .is_err() + { + trace!( + target: "trie::proof_task", + worker_id, + account_proofs_processed, + "Account multiproof receiver dropped, discarding result" + ); + } -impl ProofTaskCtx { - /// Creates a new [`ProofTaskCtx`] with the given sorted nodes and state. - pub const fn new( - nodes_sorted: Arc, - state_sorted: Arc, - prefix_sets: Arc, - ) -> Self { - Self { nodes_sorted, state_sorted, prefix_sets } - } -} + trace!( + target: "trie::proof_task", + proof_time_us = proof_elapsed.as_micros(), + total_elapsed_us = total_elapsed.as_micros(), + total_processed = account_proofs_processed, + "Account multiproof completed" + ); + drop(_span_guard); -/// A handle that provides type-safe access to proof worker pools. -/// -/// The handle stores direct senders to both storage and account worker pools, -/// eliminating the need for a routing thread. All handles share reference-counted -/// channels, and workers shut down gracefully when all handles are dropped. -#[derive(Debug, Clone)] -pub struct ProofWorkerHandle { - /// Direct sender to storage worker pool - storage_work_tx: CrossbeamSender, - /// Direct sender to account worker pool - account_work_tx: CrossbeamSender, - /// Counter tracking available storage workers. Workers decrement when starting work, - /// increment when finishing. Used to determine whether to chunk multiproofs. - storage_available_workers: Arc, - /// Counter tracking available account workers. Workers decrement when starting work, - /// increment when finishing. Used to determine whether to chunk multiproofs. - account_available_workers: Arc, -} + // Mark worker as available again. + available_workers.fetch_add(1, Ordering::Relaxed); + } -impl ProofWorkerHandle { - /// Spawns storage and account worker pools with dedicated database transactions. - /// - /// Returns a handle for submitting proof tasks to the worker pools. - /// Workers run until the last handle is dropped. - /// - /// # Parameters - /// - `executor`: Tokio runtime handle for spawning blocking tasks - /// - `view`: Consistent database view for creating transactions - /// - `task_ctx`: Shared context with trie updates and prefix sets - /// - `storage_worker_count`: Number of storage workers to spawn - /// - `account_worker_count`: Number of account workers to spawn - pub fn new( - executor: Handle, - view: ConsistentDbView, - task_ctx: ProofTaskCtx, - storage_worker_count: usize, - account_worker_count: usize, - ) -> Self - where - Factory: DatabaseProviderFactory + Clone + 'static, - { - let (storage_work_tx, storage_work_rx) = unbounded::(); - let (account_work_tx, account_work_rx) = unbounded::(); + AccountWorkerJob::BlindedAccountNode { path, result_sender } => { + let span = debug_span!( + target: "trie::proof_task", + "Blinded account node calculation", + ?path, + worker_id, + ); + let _span_guard = span.enter(); - // Initialize availability counters at zero. Each worker will increment when it - // successfully initializes, ensuring only healthy workers are counted. - let storage_available_workers = Arc::new(AtomicUsize::new(0)); - let account_available_workers = Arc::new(AtomicUsize::new(0)); + trace!( + target: "trie::proof_task", + "Processing blinded account node" + ); - debug!( - target: "trie::proof_task", - storage_worker_count, - account_worker_count, - "Spawning proof worker pools" - ); + let start = Instant::now(); + let result = blinded_provider_factory.account_node_provider().trie_node(&path); + let elapsed = start.elapsed(); - let parent_span = - debug_span!(target: "trie::proof_task", "storage proof workers", ?storage_worker_count) - .entered(); - // Spawn storage workers - for worker_id in 0..storage_worker_count { - let span = debug_span!(target: "trie::proof_task", "storage worker", ?worker_id); - let view_clone = view.clone(); - let task_ctx_clone = task_ctx.clone(); - let work_rx_clone = storage_work_rx.clone(); - let storage_available_workers_clone = storage_available_workers.clone(); + account_nodes_processed += 1; - executor.spawn_blocking(move || { - #[cfg(feature = "metrics")] - let metrics = ProofTaskTrieMetrics::default(); + if result_sender.send(result).is_err() { + trace!( + target: "trie::proof_task", + worker_id, + ?path, + account_nodes_processed, + "Blinded account node receiver dropped, discarding result" + ); + } - let _guard = span.enter(); - storage_worker_loop( - view_clone, - task_ctx_clone, - work_rx_clone, - worker_id, - storage_available_workers_clone, - #[cfg(feature = "metrics")] - metrics, - ) - }); + trace!( + target: "trie::proof_task", + node_time_us = elapsed.as_micros(), + total_processed = account_nodes_processed, + "Blinded account node completed" + ); + drop(_span_guard); + + // Mark worker as available again. + available_workers.fetch_add(1, Ordering::Relaxed); + } } - drop(parent_span); + } - let parent_span = - debug_span!(target: "trie::proof_task", "account proof workers", ?storage_worker_count) - .entered(); - // Spawn account workers - for worker_id in 0..account_worker_count { - let span = debug_span!(target: "trie::proof_task", "account worker", ?worker_id); - let view_clone = view.clone(); - let task_ctx_clone = task_ctx.clone(); - let work_rx_clone = account_work_rx.clone(); - let storage_work_tx_clone = storage_work_tx.clone(); - let account_available_workers_clone = account_available_workers.clone(); + trace!( + target: "trie::proof_task", + worker_id, + account_proofs_processed, + account_nodes_processed, + "Account worker shutting down" + ); - executor.spawn_blocking(move || { - #[cfg(feature = "metrics")] - let metrics = ProofTaskTrieMetrics::default(); + #[cfg(feature = "metrics")] + metrics.record_account_nodes(account_nodes_processed as usize); +} - let _guard = span.enter(); - account_worker_loop( - view_clone, - task_ctx_clone, - work_rx_clone, - storage_work_tx_clone, - worker_id, - account_available_workers_clone, - #[cfg(feature = "metrics")] - metrics, - ) - }); - } - drop(parent_span); +/// Builds an account multiproof by consuming storage proof receivers lazily during trie walk. +/// +/// This is a helper function used by account workers to build the account subtree proof +/// while storage proofs are still being computed. Receivers are consumed only when needed, +/// enabling interleaved parallelism between account trie traversal and storage proof computation. +/// +/// Returns a `DecodedMultiProof` containing the account subtree and storage proofs. +fn build_account_multiproof_with_storage_roots( + trie_cursor_factory: C, + hashed_cursor_factory: H, + ctx: AccountMultiproofParams<'_>, + tracker: &mut ParallelTrieTracker, +) -> Result +where + C: TrieCursorFactory + Clone, + H: HashedCursorFactory + Clone, +{ + let accounts_added_removed_keys = + ctx.multi_added_removed_keys.as_ref().map(|keys| keys.get_accounts()); - Self { - storage_work_tx, - account_work_tx, - storage_available_workers, - account_available_workers, - } - } + // Create the walker. + let walker = TrieWalker::<_>::state_trie( + trie_cursor_factory.account_trie_cursor().map_err(ProviderError::Database)?, + ctx.prefix_set, + ) + .with_added_removed_keys(accounts_added_removed_keys) + .with_deletions_retained(true); - /// Returns true if there are available storage workers to process tasks. - pub fn has_available_storage_workers(&self) -> bool { - self.storage_available_workers.load(Ordering::Relaxed) > 0 - } + // Create a hash builder to rebuild the root node since it is not available in the database. + let retainer = ctx + .targets + .keys() + .map(Nibbles::unpack) + .collect::() + .with_added_removed_keys(accounts_added_removed_keys); + let mut hash_builder = HashBuilder::default() + .with_proof_retainer(retainer) + .with_updates(ctx.collect_branch_node_masks); - /// Returns true if there are available account workers to process tasks. - pub fn has_available_account_workers(&self) -> bool { - self.account_available_workers.load(Ordering::Relaxed) > 0 - } + // Initialize storage multiproofs map with pre-allocated capacity. + // Proofs will be inserted as they're consumed from receivers during trie walk. + let mut collected_decoded_storages: B256Map = + B256Map::with_capacity_and_hasher(ctx.targets.len(), Default::default()); + let mut account_rlp = Vec::with_capacity(TRIE_ACCOUNT_RLP_MAX_SIZE); + let mut account_node_iter = TrieNodeIter::state_trie( + walker, + hashed_cursor_factory.hashed_account_cursor().map_err(ProviderError::Database)?, + ); - /// Returns the number of pending storage tasks in the queue. - pub fn pending_storage_tasks(&self) -> usize { - self.storage_work_tx.len() - } + let mut storage_proof_receivers = ctx.storage_proof_receivers; - /// Returns the number of pending account tasks in the queue. - pub fn pending_account_tasks(&self) -> usize { - self.account_work_tx.len() - } + while let Some(account_node) = account_node_iter.try_next().map_err(ProviderError::Database)? { + match account_node { + TrieElement::Branch(node) => { + hash_builder.add_branch(node.key, node.value, node.children_are_in_trie); + } + TrieElement::Leaf(hashed_address, account) => { + let root = match storage_proof_receivers.remove(&hashed_address) { + Some(receiver) => { + // Block on this specific storage proof receiver - enables interleaved + // parallelism + let proof_msg = receiver.recv().map_err(|_| { + ParallelStateRootError::StorageRoot( + reth_execution_errors::StorageRootError::Database( + DatabaseError::Other(format!( + "Storage proof channel closed for {hashed_address}" + )), + ), + ) + })?; - /// Dispatch a storage proof computation to storage worker pool - /// - /// The result will be sent via the `proof_result_sender` channel. - pub fn dispatch_storage_proof( - &self, - input: StorageProofInput, - proof_result_sender: ProofResultContext, - ) -> Result<(), ProviderError> { - self.storage_work_tx - .send(StorageWorkerJob::StorageProof { input, proof_result_sender }) - .map_err(|err| { - let error = - ProviderError::other(std::io::Error::other("storage workers unavailable")); + // Extract storage proof from the multiproof wrapper + let (mut multiproof, _stats) = proof_msg.result?; + let proof = + multiproof.storages.remove(&hashed_address).ok_or_else(|| { + ParallelStateRootError::Other(format!( + "storage proof not found in multiproof for {hashed_address}" + )) + })?; - if let StorageWorkerJob::StorageProof { proof_result_sender, .. } = err.0 { - let ProofResultContext { - sender: result_tx, - sequence_number: seq, - state, - start_time: start, - } = proof_result_sender; + let root = proof.root; + collected_decoded_storages.insert(hashed_address, proof); + root + } + // Since we do not store all intermediate nodes in the database, there might + // be a possibility of re-adding a non-modified leaf to the hash builder. + None => { + tracker.inc_missed_leaves(); - let _ = result_tx.send(ProofResultMessage { - sequence_number: seq, - result: Err(ParallelStateRootError::Provider(error.clone())), - elapsed: start.elapsed(), - state, - }); - } + match ctx.missed_leaves_storage_roots.entry(hashed_address) { + dashmap::Entry::Occupied(occ) => *occ.get(), + dashmap::Entry::Vacant(vac) => { + let root = StorageProof::new_hashed( + trie_cursor_factory.clone(), + hashed_cursor_factory.clone(), + hashed_address, + ) + .with_prefix_set_mut(Default::default()) + .storage_multiproof( + ctx.targets.get(&hashed_address).cloned().unwrap_or_default(), + ) + .map_err(|e| { + ParallelStateRootError::StorageRoot( + reth_execution_errors::StorageRootError::Database( + DatabaseError::Other(e.to_string()), + ), + ) + })? + .root; + + vac.insert(root); + root + } + } + } + }; + + // Encode account + account_rlp.clear(); + let account = account.into_trie_account(root); + account.encode(&mut account_rlp as &mut dyn BufMut); - error - }) + hash_builder.add_leaf(Nibbles::unpack(hashed_address), &account_rlp); + } + } } - /// Dispatch an account multiproof computation - /// - /// The result will be sent via the `result_sender` channel included in the input. - pub fn dispatch_account_multiproof( - &self, - input: AccountMultiproofInput, - ) -> Result<(), ProviderError> { - self.account_work_tx - .send(AccountWorkerJob::AccountMultiproof { input: Box::new(input) }) - .map_err(|err| { - let error = - ProviderError::other(std::io::Error::other("account workers unavailable")); + // Consume remaining storage proof receivers for accounts not encountered during trie walk. + for (hashed_address, receiver) in storage_proof_receivers { + if let Ok(proof_msg) = receiver.recv() { + // Extract storage proof from the result + if let Ok(ProofResult::StorageProof { hashed_address: addr, proof }) = proof_msg.result && + addr == hashed_address + { + collected_decoded_storages.insert(hashed_address, proof); + } + } + } - if let AccountWorkerJob::AccountMultiproof { input } = err.0 { - let AccountMultiproofInput { - proof_result_sender: - ProofResultContext { - sender: result_tx, - sequence_number: seq, - state, - start_time: start, - }, - .. - } = *input; + let _ = hash_builder.root(); - let _ = result_tx.send(ProofResultMessage { - sequence_number: seq, - result: Err(ParallelStateRootError::Provider(error.clone())), - elapsed: start.elapsed(), - state, - }); - } + let account_subtree_raw_nodes = hash_builder.take_proof_nodes(); + let decoded_account_subtree = DecodedProofNodes::try_from(account_subtree_raw_nodes)?; - error - }) - } + let (branch_node_hash_masks, branch_node_tree_masks) = if ctx.collect_branch_node_masks { + let updated_branch_nodes = hash_builder.updated_branch_nodes.unwrap_or_default(); + ( + updated_branch_nodes.iter().map(|(path, node)| (*path, node.hash_mask)).collect(), + updated_branch_nodes.into_iter().map(|(path, node)| (path, node.tree_mask)).collect(), + ) + } else { + (Default::default(), Default::default()) + }; - /// Dispatch blinded storage node request to storage worker pool - pub(crate) fn dispatch_blinded_storage_node( - &self, - account: B256, - path: Nibbles, - ) -> Result, ProviderError> { - let (tx, rx) = channel(); - self.storage_work_tx - .send(StorageWorkerJob::BlindedStorageNode { account, path, result_sender: tx }) - .map_err(|_| { - ProviderError::other(std::io::Error::other("storage workers unavailable")) - })?; + Ok(DecodedMultiProof { + account_subtree: decoded_account_subtree, + branch_node_hash_masks, + branch_node_tree_masks, + storages: collected_decoded_storages, + }) +} - Ok(rx) - } +/// Queues storage proofs for all accounts in the targets and returns receivers. +/// +/// This function queues all storage proof tasks to the worker pool but returns immediately +/// with receivers, allowing the account trie walk to proceed in parallel with storage proof +/// computation. This enables interleaved parallelism for better performance. +/// +/// Propagates errors up if queuing fails. Receivers must be consumed by the caller. +fn dispatch_storage_proofs( + storage_work_tx: &CrossbeamSender, + targets: &MultiProofTargets, + storage_prefix_sets: &mut B256Map, + with_branch_node_masks: bool, + multi_added_removed_keys: Option<&Arc>, +) -> Result>, ParallelStateRootError> { + let mut storage_proof_receivers = + B256Map::with_capacity_and_hasher(targets.len(), Default::default()); - /// Dispatch blinded account node request to account worker pool - pub(crate) fn dispatch_blinded_account_node( - &self, - path: Nibbles, - ) -> Result, ProviderError> { - let (tx, rx) = channel(); - self.account_work_tx - .send(AccountWorkerJob::BlindedAccountNode { path, result_sender: tx }) + // Dispatch all storage proofs to worker pool + for (hashed_address, target_slots) in targets.iter() { + let prefix_set = storage_prefix_sets.remove(hashed_address).unwrap_or_default(); + + // Create channel for receiving ProofResultMessage + let (result_tx, result_rx) = crossbeam_channel::unbounded(); + let start = Instant::now(); + + // Create computation input (data only, no communication channel) + let input = StorageProofInput::new( + *hashed_address, + prefix_set, + target_slots.clone(), + with_branch_node_masks, + multi_added_removed_keys.cloned(), + ); + + // Always dispatch a storage proof so we obtain the storage root even when no slots are + // requested. + storage_work_tx + .send(StorageWorkerJob::StorageProof { + input, + proof_result_sender: ProofResultContext::new( + result_tx, + 0, + HashedPostState::default(), + start, + ), + }) .map_err(|_| { - ProviderError::other(std::io::Error::other("account workers unavailable")) + ParallelStateRootError::Other(format!( + "Failed to queue storage proof for {}: storage worker pool unavailable", + hashed_address + )) })?; - Ok(rx) + storage_proof_receivers.insert(*hashed_address, result_rx); } + + Ok(storage_proof_receivers) } -impl TrieNodeProviderFactory for ProofWorkerHandle { - type AccountNodeProvider = ProofTaskTrieNodeProvider; - type StorageNodeProvider = ProofTaskTrieNodeProvider; +/// Input parameters for storage proof computation. +#[derive(Debug)] +pub struct StorageProofInput { + /// The hashed address for which the proof is calculated. + hashed_address: B256, + /// The prefix set for the proof calculation. + prefix_set: PrefixSet, + /// The target slots for the proof calculation. + target_slots: B256Set, + /// Whether or not to collect branch node masks + with_branch_node_masks: bool, + /// Provided by the user to give the necessary context to retain extra proofs. + multi_added_removed_keys: Option>, +} - fn account_node_provider(&self) -> Self::AccountNodeProvider { - ProofTaskTrieNodeProvider::AccountNode { handle: self.clone() } +impl StorageProofInput { + /// Creates a new [`StorageProofInput`] with the given hashed address, prefix set, and target + /// slots. + pub const fn new( + hashed_address: B256, + prefix_set: PrefixSet, + target_slots: B256Set, + with_branch_node_masks: bool, + multi_added_removed_keys: Option>, + ) -> Self { + Self { + hashed_address, + prefix_set, + target_slots, + with_branch_node_masks, + multi_added_removed_keys, + } } +} - fn storage_node_provider(&self, account: B256) -> Self::StorageNodeProvider { - ProofTaskTrieNodeProvider::StorageNode { account, handle: self.clone() } - } +/// Input parameters for account multiproof computation. +#[derive(Debug, Clone)] +pub struct AccountMultiproofInput { + /// The targets for which to compute the multiproof. + pub targets: MultiProofTargets, + /// The prefix sets for the proof calculation. + pub prefix_sets: TriePrefixSets, + /// Whether or not to collect branch node masks. + pub collect_branch_node_masks: bool, + /// Provided by the user to give the necessary context to retain extra proofs. + pub multi_added_removed_keys: Option>, + /// Cached storage proof roots for missed leaves encountered during account trie walk. + pub missed_leaves_storage_roots: Arc>, + /// Context for sending the proof result. + pub proof_result_sender: ProofResultContext, } -/// Trie node provider for retrieving trie nodes by path. +/// Parameters for building an account multiproof with pre-computed storage roots. +struct AccountMultiproofParams<'a> { + /// The targets for which to compute the multiproof. + targets: &'a MultiProofTargets, + /// The prefix set for the account trie walk. + prefix_set: PrefixSet, + /// Whether or not to collect branch node masks. + collect_branch_node_masks: bool, + /// Provided by the user to give the necessary context to retain extra proofs. + multi_added_removed_keys: Option<&'a Arc>, + /// Receivers for storage proofs being computed in parallel. + storage_proof_receivers: B256Map>, + /// Cached storage proof roots for missed leaves encountered during account trie walk. + missed_leaves_storage_roots: &'a DashMap, +} + +/// Internal message for account workers. #[derive(Debug)] -pub enum ProofTaskTrieNodeProvider { - /// Blinded account trie node provider. - AccountNode { - /// Handle to the proof worker pools. - handle: ProofWorkerHandle, +enum AccountWorkerJob { + /// Account multiproof computation request + AccountMultiproof { + /// Account multiproof input parameters + input: Box, }, - /// Blinded storage trie node provider. - StorageNode { - /// Target account. - account: B256, - /// Handle to the proof worker pools. - handle: ProofWorkerHandle, + /// Blinded account node retrieval request + BlindedAccountNode { + /// Path to the account node + path: Nibbles, + /// Channel to send result back to original caller + result_sender: Sender, }, } -impl TrieNodeProvider for ProofTaskTrieNodeProvider { - fn trie_node(&self, path: &Nibbles) -> Result, SparseTrieError> { - match self { - Self::AccountNode { handle } => { - let rx = handle - .dispatch_blinded_account_node(*path) - .map_err(|error| SparseTrieErrorKind::Other(Box::new(error)))?; - rx.recv().map_err(|error| SparseTrieErrorKind::Other(Box::new(error)))? - } - Self::StorageNode { handle, account } => { - let rx = handle - .dispatch_blinded_storage_node(*account, *path) - .map_err(|error| SparseTrieErrorKind::Other(Box::new(error)))?; - rx.recv().map_err(|error| SparseTrieErrorKind::Other(Box::new(error)))? - } - } - } -} - #[cfg(test)] mod tests { use super::*; From d46b30f155192965b507e7ff4bbc25a673847735 Mon Sep 17 00:00:00 2001 From: Yong Kang Date: Mon, 27 Oct 2025 04:46:50 +0000 Subject: [PATCH 3/8] standardise _guard to _span_guard --- crates/trie/parallel/src/proof_task.rs | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/crates/trie/parallel/src/proof_task.rs b/crates/trie/parallel/src/proof_task.rs index f210d1b0ccf..53b3986604d 100644 --- a/crates/trie/parallel/src/proof_task.rs +++ b/crates/trie/parallel/src/proof_task.rs @@ -143,7 +143,7 @@ impl ProofWorkerHandle { let storage_worker_parent = debug_span!(target: "trie::proof_task", "Storage worker tasks", ?storage_worker_count); - let _guard = storage_worker_parent.enter(); + let _span_guard = storage_worker_parent.enter(); // Spawn storage workers for worker_id in 0..storage_worker_count { @@ -157,7 +157,7 @@ impl ProofWorkerHandle { #[cfg(feature = "metrics")] let metrics = ProofTaskTrieMetrics::default(); - let _guard = parent_span.enter(); + let _span_guard = parent_span.enter(); storage_worker_loop( view_clone, task_ctx_clone, @@ -176,11 +176,11 @@ impl ProofWorkerHandle { ); } - drop(_guard); + drop(_span_guard); let account_worker_parent = debug_span!(target: "trie::proof_task", "Account worker tasks", ?account_worker_count); - let _guard = account_worker_parent.enter(); + let _span_guard = account_worker_parent.enter(); // Spawn account workers for worker_id in 0..account_worker_count { @@ -195,7 +195,7 @@ impl ProofWorkerHandle { #[cfg(feature = "metrics")] let metrics = ProofTaskTrieMetrics::default(); - let _guard = parent_span.enter(); + let _span_guard = parent_span.enter(); account_worker_loop( view_clone, task_ctx_clone, @@ -215,7 +215,7 @@ impl ProofWorkerHandle { ); } - drop(_guard); + drop(_span_guard); Self::new_handle( storage_work_tx, From e0050580284f5f525de567a866310fa823408a53 Mon Sep 17 00:00:00 2001 From: Yong Kang Date: Mon, 27 Oct 2025 04:47:46 +0000 Subject: [PATCH 4/8] edge case error fix --- crates/trie/parallel/src/proof_task.rs | 2 ++ 1 file changed, 2 insertions(+) diff --git a/crates/trie/parallel/src/proof_task.rs b/crates/trie/parallel/src/proof_task.rs index 53b3986604d..8d4c0a8c27c 100644 --- a/crates/trie/parallel/src/proof_task.rs +++ b/crates/trie/parallel/src/proof_task.rs @@ -939,6 +939,8 @@ fn account_worker_loop( elapsed: start.elapsed(), state, }); + // Mark worker as available again before skipping the job. + available_workers.fetch_add(1, Ordering::Relaxed); continue; } }; From d29a6d471172594acc33d127418d47edffcd022c Mon Sep 17 00:00:00 2001 From: Yong Kang Date: Mon, 27 Oct 2025 04:48:03 +0000 Subject: [PATCH 5/8] error handling of proof result --- crates/trie/parallel/src/proof_task.rs | 24 ++++++++++++++++-------- 1 file changed, 16 insertions(+), 8 deletions(-) diff --git a/crates/trie/parallel/src/proof_task.rs b/crates/trie/parallel/src/proof_task.rs index 8d4c0a8c27c..582cbc18ecd 100644 --- a/crates/trie/parallel/src/proof_task.rs +++ b/crates/trie/parallel/src/proof_task.rs @@ -1128,14 +1128,22 @@ where ) })?; - // Extract storage proof from the multiproof wrapper - let (mut multiproof, _stats) = proof_msg.result?; - let proof = - multiproof.storages.remove(&hashed_address).ok_or_else(|| { - ParallelStateRootError::Other(format!( - "storage proof not found in multiproof for {hashed_address}" - )) - })?; + // Extract storage proof from the result + let proof = match proof_msg.result? { + ProofResult::StorageProof { hashed_address: addr, proof } => { + if addr != hashed_address { + return Err(ParallelStateRootError::Other(format!( + "storage proof address mismatch: expected {hashed_address}, got {addr}" + ))); + } + proof + } + ProofResult::AccountMultiproof(..) => { + return Err(ParallelStateRootError::Other( + "expected storage proof, got account multiproof".to_string(), + )); + } + }; let root = proof.root; collected_decoded_storages.insert(hashed_address, proof); From f3913f47c9343351f9200538456e1ebd56b15527 Mon Sep 17 00:00:00 2001 From: Yong Kang Date: Mon, 27 Oct 2025 04:53:01 +0000 Subject: [PATCH 6/8] rename --- crates/trie/parallel/src/proof.rs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/crates/trie/parallel/src/proof.rs b/crates/trie/parallel/src/proof.rs index e8b39f38ec6..63d26993d50 100644 --- a/crates/trie/parallel/src/proof.rs +++ b/crates/trie/parallel/src/proof.rs @@ -88,7 +88,7 @@ impl ParallelProof { self } /// Queues a storage proof task and returns a receiver for the result. - fn queue_storage_proof( + fn send_storage_proof( &self, hashed_address: B256, prefix_set: PrefixSet, @@ -132,7 +132,7 @@ impl ParallelProof { "Starting storage proof generation" ); - let receiver = self.queue_storage_proof(hashed_address, prefix_set, target_slots)?; + let receiver = self.send_storage_proof(hashed_address, prefix_set, target_slots)?; let proof_msg = receiver.recv().map_err(|_| { ParallelStateRootError::StorageRoot(StorageRootError::Database(DatabaseError::Other( format!("channel closed for {hashed_address}"), From dddc9e12370a7e159e82b2269d450fcd09f7a141 Mon Sep 17 00:00:00 2001 From: Yong Kang Date: Mon, 27 Oct 2025 05:11:21 +0000 Subject: [PATCH 7/8] refactor(multiproof): streamline dispatch logic and add multiproof conversion - Updated the `dispatch` method to accept `MultiproofInput` directly, simplifying the handling of proof targets. - Removed the separate handling for `PendingMultiproofTask`, consolidating the dispatch logic. - Introduced `into_multiproof` method in `ProofResult` for converting proof results into `DecodedMultiProof`, enhancing clarity and usability. --- .../src/tree/payload_processor/multiproof.rs | 124 ++++-------------- crates/trie/parallel/src/proof_task.rs | 15 +++ 2 files changed, 41 insertions(+), 98 deletions(-) diff --git a/crates/engine/tree/src/tree/payload_processor/multiproof.rs b/crates/engine/tree/src/tree/payload_processor/multiproof.rs index 0baafeda6ca..aa6c6b5786d 100644 --- a/crates/engine/tree/src/tree/payload_processor/multiproof.rs +++ b/crates/engine/tree/src/tree/payload_processor/multiproof.rs @@ -72,87 +72,18 @@ impl MultiproofManager { } /// Dispatches a new multiproof calculation to worker pools. - fn dispatch(&mut self, input: PendingMultiproofTask) { + fn dispatch(&mut self, input: MultiproofInput) { // If there are no proof targets, we can just send an empty multiproof back immediately - if input.proof_targets_is_empty() { + if input.proof_targets.is_empty() { debug!( - sequence_number = input.proof_sequence_number(), + sequence_number = input.proof_sequence_number, "No proof targets, sending empty multiproof back immediately" ); input.send_empty_proof(); return } - match input { - PendingMultiproofTask::Storage(storage_input) => { - self.dispatch_storage_proof(storage_input); - } - PendingMultiproofTask::Regular(multiproof_input) => { - self.dispatch_multiproof(multiproof_input); - } - } - } - - /// Dispatches a single storage proof calculation to worker pool. - fn dispatch_storage_proof(&mut self, storage_multiproof_input: StorageMultiproofInput) { - let StorageMultiproofInput { - hashed_state_update, - hashed_address, - proof_targets, - proof_sequence_number, - multi_added_removed_keys, - state_root_message_sender: _, - } = storage_multiproof_input; - - let storage_targets = proof_targets.len(); - - trace!( - target: "engine::tree::payload_processor::multiproof", - proof_sequence_number, - ?proof_targets, - storage_targets, - "Dispatching storage proof to workers" - ); - - let start = Instant::now(); - - // Create prefix set from targets - let prefix_set = reth_trie::prefix_set::PrefixSetMut::from( - proof_targets.iter().map(reth_trie::Nibbles::unpack), - ); - let prefix_set = prefix_set.freeze(); - - // Build computation input (data only) - let input = reth_trie_parallel::proof_task::StorageProofInput::new( - hashed_address, - prefix_set, - proof_targets, - true, // with_branch_node_masks - Some(multi_added_removed_keys), - ); - - // Dispatch to storage worker - if let Err(e) = self.proof_worker_handle.dispatch_storage_proof( - input, - reth_trie_parallel::proof_task::ProofResultContext::new( - self.proof_result_tx.clone(), - proof_sequence_number, - hashed_state_update, - start, - ), - ) { - error!(target: "engine::tree::payload_processor::multiproof", ?e, "Failed to dispatch storage proof"); - return; - } - - self.inflight += 1; - self.metrics.inflight_multiproofs_histogram.record(self.inflight as f64); - self.metrics - .pending_storage_multiproofs_histogram - .record(self.proof_worker_handle.pending_storage_tasks() as f64); - self.metrics - .pending_account_multiproofs_histogram - .record(self.proof_worker_handle.pending_account_tasks() as f64); + self.dispatch_multiproof(input); } /// Signals that a multiproof calculation has finished. @@ -425,18 +356,15 @@ impl MultiProofTask { self.multiproof_manager.proof_worker_handle.has_available_storage_workers(); let mut dispatch = |proof_targets| { - self.multiproof_manager.dispatch( - MultiproofInput { - config: self.config.clone(), - source: None, - hashed_state_update: Default::default(), - proof_targets, - proof_sequence_number: self.proof_sequencer.next_sequence(), - state_root_message_sender: self.tx.clone(), - multi_added_removed_keys: Some(multi_added_removed_keys.clone()), - } - .into(), - ); + self.multiproof_manager.dispatch(MultiproofInput { + config: self.config.clone(), + source: None, + hashed_state_update: Default::default(), + proof_targets, + proof_sequence_number: self.proof_sequencer.next_sequence(), + state_root_message_sender: self.tx.clone(), + multi_added_removed_keys: Some(multi_added_removed_keys.clone()), + }); chunks += 1; }; @@ -573,18 +501,15 @@ impl MultiProofTask { ); spawned_proof_targets.extend_ref(&proof_targets); - self.multiproof_manager.dispatch( - MultiproofInput { - config: self.config.clone(), - source: Some(source), - hashed_state_update, - proof_targets, - proof_sequence_number: self.proof_sequencer.next_sequence(), - state_root_message_sender: self.tx.clone(), - multi_added_removed_keys: Some(multi_added_removed_keys.clone()), - } - .into(), - ); + self.multiproof_manager.dispatch(MultiproofInput { + config: self.config.clone(), + source: Some(source), + hashed_state_update, + proof_targets, + proof_sequence_number: self.proof_sequencer.next_sequence(), + state_root_message_sender: self.tx.clone(), + multi_added_removed_keys: Some(multi_added_removed_keys.clone()), + }); chunks += 1; }; @@ -800,7 +725,7 @@ impl MultiProofTask { // Convert ProofResultMessage to SparseTrieUpdate match proof_result.result { - Ok((multiproof, _stats)) => { + Ok(proof_result_data) => { debug!( target: "engine::tree::payload_processor::multiproof", sequence = proof_result.sequence_number, @@ -808,6 +733,9 @@ impl MultiProofTask { "Processing calculated proof from worker" ); + // Convert ProofResult to DecodedMultiProof + let multiproof = proof_result_data.into_multiproof(); + let update = SparseTrieUpdate { state: proof_result.state, multiproof, diff --git a/crates/trie/parallel/src/proof_task.rs b/crates/trie/parallel/src/proof_task.rs index 582cbc18ecd..d895866b4d9 100644 --- a/crates/trie/parallel/src/proof_task.rs +++ b/crates/trie/parallel/src/proof_task.rs @@ -564,6 +564,21 @@ pub enum ProofResult { }, } +impl ProofResult { + /// Convert this proof result into a `DecodedMultiProof`. + /// + /// For account multiproofs, returns the multiproof directly (discarding stats). + /// For storage proofs, wraps the storage proof into a minimal multiproof. + pub fn into_multiproof(self) -> DecodedMultiProof { + match self { + ProofResult::AccountMultiproof(multiproof, _stats) => multiproof, + ProofResult::StorageProof { hashed_address, proof } => { + DecodedMultiProof::from_storage_proof(hashed_address, proof) + } + } + } +} + /// Channel used by worker threads to deliver `ProofResultMessage` items back to /// `MultiProofTask`. /// From ff2c898ee2c27a49da3ba889baca984b264bc472 Mon Sep 17 00:00:00 2001 From: Yong Kang Date: Mon, 27 Oct 2025 05:29:32 +0000 Subject: [PATCH 8/8] refactor(proof): streamline storage proof extraction and error handling - Updated the extraction of storage proofs to directly match against `ProofResult`, improving clarity and reducing unnecessary complexity. - Replaced error handling for mismatched addresses with `debug_assert_eq!` for better debugging during development. - Ensured that unreachable code paths are clearly defined for account multiproof handling, enhancing code safety and maintainability. --- crates/trie/parallel/src/proof.rs | 32 ++++++++++++++++++-------- crates/trie/parallel/src/proof_task.rs | 18 +++++++-------- 2 files changed, 30 insertions(+), 20 deletions(-) diff --git a/crates/trie/parallel/src/proof.rs b/crates/trie/parallel/src/proof.rs index 63d26993d50..4c729cc617c 100644 --- a/crates/trie/parallel/src/proof.rs +++ b/crates/trie/parallel/src/proof.rs @@ -139,15 +139,20 @@ impl ParallelProof { ))) })?; - // Extract the multiproof from the result - let (mut multiproof, _stats) = proof_msg.result?; - - // Extract storage proof from the multiproof - let storage_proof = multiproof.storages.remove(&hashed_address).ok_or_else(|| { - ParallelStateRootError::StorageRoot(StorageRootError::Database(DatabaseError::Other( - format!("storage proof not found in multiproof for {hashed_address}"), - ))) - })?; + // Extract storage proof directly from the result + let storage_proof = match proof_msg.result? { + crate::proof_task::ProofResult::StorageProof { hashed_address: addr, proof } => { + debug_assert_eq!( + addr, + hashed_address, + "storage worker must return same address: expected {hashed_address}, got {addr}" + ); + proof + } + crate::proof_task::ProofResult::AccountMultiproof(..) => { + unreachable!("storage worker only sends StorageProof variant") + } + }; trace!( target: "trie::parallel_proof", @@ -231,7 +236,14 @@ impl ParallelProof { ) })?; - let (multiproof, stats) = proof_result_msg.result?; + let (multiproof, stats) = match proof_result_msg.result? { + crate::proof_task::ProofResult::AccountMultiproof(multiproof, stats) => { + (multiproof, stats) + } + crate::proof_task::ProofResult::StorageProof { .. } => { + unreachable!("account worker only sends AccountMultiproof variant") + } + }; #[cfg(feature = "metrics")] self.metrics.record(stats); diff --git a/crates/trie/parallel/src/proof_task.rs b/crates/trie/parallel/src/proof_task.rs index d895866b4d9..51a5d9aa068 100644 --- a/crates/trie/parallel/src/proof_task.rs +++ b/crates/trie/parallel/src/proof_task.rs @@ -571,8 +571,8 @@ impl ProofResult { /// For storage proofs, wraps the storage proof into a minimal multiproof. pub fn into_multiproof(self) -> DecodedMultiProof { match self { - ProofResult::AccountMultiproof(multiproof, _stats) => multiproof, - ProofResult::StorageProof { hashed_address, proof } => { + Self::AccountMultiproof(multiproof, _stats) => multiproof, + Self::StorageProof { hashed_address, proof } => { DecodedMultiProof::from_storage_proof(hashed_address, proof) } } @@ -1146,17 +1146,15 @@ where // Extract storage proof from the result let proof = match proof_msg.result? { ProofResult::StorageProof { hashed_address: addr, proof } => { - if addr != hashed_address { - return Err(ParallelStateRootError::Other(format!( - "storage proof address mismatch: expected {hashed_address}, got {addr}" - ))); - } + debug_assert_eq!( + addr, + hashed_address, + "storage worker must return same address: expected {hashed_address}, got {addr}" + ); proof } ProofResult::AccountMultiproof(..) => { - return Err(ParallelStateRootError::Other( - "expected storage proof, got account multiproof".to_string(), - )); + unreachable!("storage worker only sends StorageProof variant") } };