From fbc54d03a5092e8bff60c59b43785762b05a1d4d Mon Sep 17 00:00:00 2001 From: gaoxin Date: Thu, 31 Oct 2024 22:55:00 +0800 Subject: [PATCH] opt: async commit transitions and update caches --- src/partition.rs | 9 +++- src/scheduler.rs | 136 +++++++++++++++++++++++++++++++++++------------ src/storage.rs | 86 ++++++++++++++++++------------ 3 files changed, 161 insertions(+), 70 deletions(-) diff --git a/src/partition.rs b/src/partition.rs index ac8e9c4..d8ab27d 100644 --- a/src/partition.rs +++ b/src/partition.rs @@ -10,7 +10,7 @@ use revm::{ }; use std::{ collections::BTreeSet, - sync::Arc, + sync::{mpsc::Sender, Arc}, time::{Duration, Instant}, }; @@ -50,6 +50,7 @@ where /// allowing modification of transaction states during execution tx_states: SharedTxStates, txs: Arc>, + pre_commit_sender: Sender, pub partition_db: PartitionDB, pub assigned_txs: Vec, @@ -73,6 +74,7 @@ where txs: Arc>, tx_states: SharedTxStates, assigned_txs: Vec, + pre_commit_sender: Sender, ) -> Self { let coinbase = env.block.coinbase; let partition_db = PartitionDB::new(coinbase, scheduler_db); @@ -85,6 +87,7 @@ where txs, partition_db, assigned_txs, + pre_commit_sender, error_txs: HashMap::new(), tx_dependency: vec![], metrics: Default::default(), @@ -126,6 +129,7 @@ where should_execute = false; self.metrics.reusable_tx_cnt += 1; tx_states[txid].tx_status = TransactionStatus::SkipValidation; + self.pre_commit_sender.send(txid).unwrap(); } } if should_execute { @@ -174,6 +178,9 @@ where miner_update, }, }; + if skip_validation { + self.pre_commit_sender.send(txid).unwrap(); + } } Err(err) => { // In a parallel execution environment, transactions might fail due to diff --git a/src/scheduler.rs b/src/scheduler.rs index a80eefa..3a9e526 100644 --- a/src/scheduler.rs +++ b/src/scheduler.rs @@ -7,15 +7,8 @@ use crate::{ GrevmError, LocationAndType, ResultAndTransition, TransactionStatus, TxId, CPU_CORES, GREVM_RUNTIME, MAX_NUM_ROUND, }; -use fastrace::Span; -use std::{ - collections::BTreeSet, - ops::DerefMut, - sync::{Arc, RwLock}, - time::{Duration, Instant}, -}; - use ahash::{AHashMap as HashMap, AHashSet as HashSet}; +use fastrace::Span; use metrics::{counter, gauge}; use revm::{ db::{states::bundle_state::BundleRetention, BundleState}, @@ -24,6 +17,18 @@ use revm::{ }, CacheState, DatabaseRef, EvmBuilder, }; +use std::{ + cmp::Reverse, + collections::{BTreeSet, BinaryHeap}, + ops::DerefMut, + sync::{ + mpsc::{channel, Receiver, Sender}, + Arc, Mutex, RwLock, + }, + thread, + time::{Duration, Instant}, +}; +use tokio::task::JoinHandle; use tracing::info; struct ExecuteMetrics { @@ -178,7 +183,7 @@ where partition_executors: Vec>>>, /// number of finality txs in the current round num_finality_txs: usize, - results: Vec, + results: Arc>>, metrics: ExecuteMetrics, } @@ -257,11 +262,50 @@ where partitioned_txs: vec![], partition_executors: vec![], num_finality_txs: 0, - results: Vec::with_capacity(num_txs), + results: Arc::new(Mutex::new(Vec::with_capacity(num_txs))), metrics: Default::default(), } } + fn async_commit( + database: Arc>, + tx_states: SharedTxStates, + results: Arc>>, + num_finality_txs: TxId, + rx: Receiver, + ) -> JoinHandle<(TxId, LazyUpdateValue)> { + GREVM_RUNTIME.spawn(async move { + #[allow(invalid_reference_casting)] + let database = + unsafe { &mut *(&(*database) as *const SchedulerDB as *mut SchedulerDB) }; + #[allow(invalid_reference_casting)] + let tx_states = + unsafe { &mut *(&(*tx_states) as *const Vec as *mut Vec) }; + let mut results = results.lock().unwrap(); + let mut update_miner = LazyUpdateValue::default(); + + let eof_signal = tx_states.len(); + let mut commit_txs = vec![false; eof_signal - num_finality_txs]; + let mut index = num_finality_txs; + while let Ok(txid) = rx.recv() { + if txid == eof_signal { + break; + } + commit_txs[txid - num_finality_txs] = true; + while index < eof_signal && commit_txs[index - num_finality_txs] { + database.commit_transition(std::mem::take( + &mut tx_states[index].execute_result.transition, + )); + results.push(tx_states[index].execute_result.result.clone().unwrap()); + update_miner = + update_miner.merge(tx_states[index].execute_result.miner_update.clone()); + index += 1; + } + } + (index, update_miner) + }) + } + /// Get the partitioned transactions by dependencies. #[fastrace::trace] pub(crate) fn partition_transactions(&mut self) { @@ -289,6 +333,7 @@ where fn round_execute(&mut self) -> Result<(), GrevmError> { self.metrics.parallel_execute_calls.increment(1); self.partition_executors.clear(); + let (tx, rx) = channel(); for partition_id in 0..self.num_partitions { let executor = PartitionExecutor::new( self.spec_id, @@ -298,11 +343,19 @@ where self.txs.clone(), self.tx_states.clone(), self.partitioned_txs[partition_id].clone(), + tx.clone(), ); self.partition_executors.push(Arc::new(RwLock::new(executor))); } let start = Instant::now(); + let commit_handle = Self::async_commit( + self.database.clone(), + self.tx_states.clone(), + self.results.clone(), + self.num_finality_txs, + rx, + ); GREVM_RUNTIME.block_on(async { let mut tasks = vec![]; for executor in &self.partition_executors { @@ -313,7 +366,7 @@ where }); self.metrics.parallel_execute_time.increment(start.elapsed().as_nanos() as u64); - self.validate_transactions() + self.validate_transactions(tx, commit_handle) } /// Merge write set after each round @@ -375,7 +428,7 @@ where /// Generate unconfirmed transactions, and find the continuous minimum TxID, /// which can be marked as finality transactions. #[fastrace::trace] - fn generate_unconfirmed_txs(&mut self) { + fn generate_unconfirmed_txs(&mut self, commit_sender: Sender) { let num_partitions = self.num_partitions; let (end_skip_id, merged_write_set) = self.merge_write_set(); self.metrics.skip_validation_cnt.increment((end_skip_id - self.num_finality_txs) as u64); @@ -414,6 +467,11 @@ where } } } + if !conflict && tx_states[txid].tx_status != TransactionStatus::SkipValidation { + // when tx_status == SkipValidation, txid has ready been sent by + // PartitionExecutor + commit_sender.send(txid).unwrap(); + } } executor.tx_dependency.push(updated_dependencies); tx_states[txid].tx_status = if conflict { @@ -423,6 +481,9 @@ where } } }); + // send `self.tx_states.len()` as the EOF signal + commit_sender.send(self.tx_states.len()).unwrap(); + drop(commit_sender); } /// Find the continuous minimum TxID, which can be marked as finality transactions. @@ -485,7 +546,11 @@ where /// Commit the transition of the finality transactions, and update the minner's rewards. #[fastrace::trace] - fn commit_transition(&mut self, finality_tx_cnt: usize) -> Result<(), GrevmError> { + fn commit_transition( + &mut self, + finality_tx_cnt: usize, + commit_handle: JoinHandle<(TxId, LazyUpdateValue)>, + ) -> Result<(), GrevmError> { let start = Instant::now(); let partition_state: Vec = self .partition_executors @@ -498,34 +563,29 @@ where // MUST drop the `PartitionExecutor::scheduler_db` before get mut self.partition_executors.clear(); + + let span = Span::enter_with_local_parent("async commit transitions"); + let (async_commit_id, update_miner) = + GREVM_RUNTIME.block_on(async { commit_handle.await.unwrap() }); + assert_eq!(async_commit_id, self.num_finality_txs); + drop(span); + + // there's a database reference in commit_handle, + // so we should wait the end of the async commit operation. let database = Arc::get_mut(&mut self.database).unwrap(); + database.merge_pre_commit(); if self.num_finality_txs < self.txs.len() { // Merging these states is only useful when there is a next round of execution. Self::merge_not_modified_state(&mut database.cache, partition_state); } - #[allow(invalid_reference_casting)] - let tx_states = - unsafe { &mut *(&(*self.tx_states) as *const Vec as *mut Vec) }; - let mut miner_updates = Vec::with_capacity(finality_tx_cnt); - let start_txid = self.num_finality_txs - finality_tx_cnt; - - let span = Span::enter_with_local_parent("database commit transitions"); - for txid in start_txid..self.num_finality_txs { - miner_updates.push(tx_states[txid].execute_result.miner_update.clone()); - database - .commit_transition(std::mem::take(&mut tx_states[txid].execute_result.transition)); - self.results.push(tx_states[txid].execute_result.result.clone().unwrap()); - } - drop(span); - // Each transaction updates three accounts: from, to, and coinbase. // If every tx updates the coinbase account, it will cause conflicts across all txs. // Therefore, we handle miner rewards separately. We don't record miner’s address in r/w // set, and track the rewards for the miner for each transaction separately. // The miner’s account is only updated after validation by SchedulerDB.increment_balances database - .update_balances(vec![(self.coinbase, LazyUpdateValue::merge(miner_updates))]) + .update_balances(vec![(self.coinbase, update_miner)]) .map_err(|err| GrevmError::EvmError(EVMError::Database(err)))?; self.metrics.commit_transition_time.increment(start.elapsed().as_nanos() as u64); Ok(()) @@ -535,13 +595,17 @@ where /// Because after each round execution, the read-write set is no longer updated. /// We can check in parallel whether the read set is out of bounds. #[fastrace::trace] - fn validate_transactions(&mut self) -> Result<(), GrevmError> { + fn validate_transactions( + &mut self, + commit_sender: Sender, + commit_handle: JoinHandle<(TxId, LazyUpdateValue)>, + ) -> Result<(), GrevmError> { let start = Instant::now(); - self.generate_unconfirmed_txs(); + self.generate_unconfirmed_txs(commit_sender); let finality_tx_cnt = self.find_continuous_min_txid()?; // update and pruning tx dependencies self.update_and_pruning_dependency(); - self.commit_transition(finality_tx_cnt)?; + self.commit_transition(finality_tx_cnt, commit_handle)?; self.metrics.validate_time.increment(start.elapsed().as_nanos() as u64); Ok(()) } @@ -553,7 +617,7 @@ where for partition in partition_state { // merge account state that is not modified for (address, account) in partition.accounts { - if account.status.is_not_modified() && state.accounts.get(&address).is_none() { + if !state.accounts.contains_key(&address) && account.status.is_not_modified() { state.accounts.insert(address, account); } } @@ -581,6 +645,7 @@ where .with_spec_id(self.spec_id) .with_env(Box::new(self.env.clone())) .build(); + let mut results = self.results.lock().unwrap(); for txid in self.num_finality_txs..self.txs.len() { if let Some(tx) = self.txs.get(txid) { *evm.tx_mut() = tx.clone(); @@ -590,7 +655,7 @@ where match evm.transact() { Ok(result_and_state) => { evm.db_mut().commit(result_and_state.state); - self.results.push(result_and_state.result); + results.push(result_and_state.result); } Err(err) => return Err(GrevmError::EvmError(err)), } @@ -606,9 +671,10 @@ where self.partition_executors.clear(); let database = Arc::get_mut(&mut self.database).unwrap(); database.merge_transitions(BundleRetention::Reverts); + let mut results = self.results.lock().unwrap(); let output = ExecuteOutput { state: std::mem::take(&mut database.bundle_state), - results: std::mem::take(&mut self.results), + results: std::mem::take(&mut results), }; self.metrics.build_output_time.increment(start.elapsed().as_nanos() as u64); output diff --git a/src/storage.rs b/src/storage.rs index 50f1001..f359ef8 100644 --- a/src/storage.rs +++ b/src/storage.rs @@ -32,43 +32,30 @@ impl Default for LazyUpdateValue { } } -/// Merge multiple LazyUpdateValue into one. +/// Merge other LazyUpdateValue. impl LazyUpdateValue { - pub(crate) fn merge(values: Vec) -> Self { - let mut value: u128 = 0; - let mut positive: bool = true; - for lazy_value in values { - match lazy_value { - Self::Increase(inc) => { - if positive { - value += inc; + pub(crate) fn merge(&self, other: Self) -> Self { + match self.clone() { + Self::Increase(value) => match other { + Self::Increase(inc) => Self::Increase(value + inc), + Self::Decrease(dec) => { + if value >= dec { + Self::Increase(value - dec) } else { - if value > inc { - value -= inc - } else { - value = inc - value; - positive = true; - } + Self::Decrease(dec - value) } } - Self::Decrease(dec) => { - if positive { - if value > dec { - value -= dec; - } else { - value = dec - value; - positive = false; - } + }, + Self::Decrease(value) => match other { + Self::Increase(inc) => { + if inc >= value { + Self::Increase(inc - value) } else { - value += dec; + Self::Decrease(value - inc) } } - } - } - if positive { - Self::Increase(value) - } else { - Self::Decrease(value) + Self::Decrease(dec) => Self::Decrease(value + dec), + }, } } } @@ -169,6 +156,8 @@ pub(crate) struct SchedulerDB { /// executors. When fall back to sequential execution, used as cached state contains both /// changed from evm execution and cached/loaded account/storages. pub cache: CacheState, + pre_commit_accounts: HashMap, + pub database: DB, /// Block state, it aggregates transactions transitions into one state. /// @@ -192,6 +181,7 @@ impl SchedulerDB { pub(crate) fn new(database: DB) -> Self { Self { cache: CacheState::new(false), + pre_commit_accounts: HashMap::new(), database, transition_state: Some(TransitionState::default()), bundle_state: BundleState::default(), @@ -204,10 +194,27 @@ impl SchedulerDB { /// partition executors. When falling back to sequential execution, these cached states will /// include both the changes from EVM execution and the cached/loaded accounts/storages. pub(crate) fn commit_transition(&mut self, transitions: Vec<(Address, TransitionAccount)>) { - apply_transition_to_cache(&mut self.cache, &transitions); + apply_transition_to_cache( + &mut self.cache, + Some(&mut self.pre_commit_accounts), + &transitions, + ); self.apply_transition(transitions); } + pub(crate) fn merge_pre_commit(&mut self) { + if !self.pre_commit_accounts.is_empty() { + let pre_commit = std::mem::take(&mut self.pre_commit_accounts); + if self.cache.accounts.is_empty() { + self.cache.accounts = pre_commit; + } else { + for (address, account) in pre_commit.into_iter() { + self.cache.accounts.insert(address, account); + } + } + } + } + /// Fall back to sequential execute pub(crate) fn commit(&mut self, changes: HashMap) { let transitions = self.cache.apply_evm_state(changes); @@ -327,15 +334,26 @@ fn load_storage( /// Apply transition to cache state. fn apply_transition_to_cache( cache: &mut CacheState, + mut pre_commit_accounts: Option<&mut HashMap>, transitions: &Vec<(Address, TransitionAccount)>, ) { for (address, account) in transitions { let new_storage = account.storage.iter().map(|(k, s)| (*k, s.present_value)); - if let Some(entry) = cache.accounts.get_mut(address) { + let update_accounts = if let Some(incremental_cache) = pre_commit_accounts.as_mut() { + if !incremental_cache.contains_key(address) && cache.accounts.contains_key(address) { + incremental_cache + .insert(address.clone(), cache.accounts.get(address).cloned().unwrap()); + } + incremental_cache + } else { + &mut cache.accounts + }; + if let Some(entry) = update_accounts.get_mut(address) { if let Some(new_info) = &account.info { assert!(!account.storage_was_destroyed); if let Some(read_account) = entry.account.as_mut() { // account is loaded + read_account.info = new_info.clone(); read_account.storage.extend(new_storage); } else { @@ -351,7 +369,7 @@ fn apply_transition_to_cache( } entry.status = account.status; } else { - cache.accounts.insert( + update_accounts.insert( *address, CacheAccount { account: account.info.as_ref().map(|info| PlainAccount { @@ -557,7 +575,7 @@ impl PartitionDB { &mut self, transitions: &Vec<(Address, TransitionAccount)>, ) { - apply_transition_to_cache(&mut self.cache, transitions); + apply_transition_to_cache(&mut self.cache, None, transitions); } }