diff --git a/Cargo.toml b/Cargo.toml index a43a2cb..17a6f4c 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -5,7 +5,8 @@ edition = "2021" description = "Create Parallel EVM" [dependencies] -revm = "14.0.0" +revm = { package = "revm", git = "https://github.com/nekomoto911/revm", branch = "v14.0.0" } +revm-primitives = { package = "revm-primitives", git = "https://github.com/nekomoto911/revm", branch = "v14.0.0" } fastrace = "0.7" tracing = "0.1.40" ahash = { version = "0.8.11", features = ["serde"] } @@ -33,7 +34,7 @@ criterion = "0.5.1" metrics-util = "0.17.0" walkdir = "2.5.0" rayon = "1.10.0" -revme = "0.10.3" +revme = { package = "revme", git = "https://github.com/nekomoto911/revm", branch = "v14.0.0" } fastrace = { version = "0.7", features = ["enable"] } fastrace-jaeger = "0.7" tikv-jemallocator = "0.5.0" diff --git a/src/lib.rs b/src/lib.rs index b2e76db..876870d 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -44,7 +44,6 @@ lazy_static! { .unwrap(); } -use crate::storage::LazyUpdateValue; pub use scheduler::*; /// The maximum number of rounds for transaction execution. @@ -133,7 +132,7 @@ pub(crate) struct ResultAndTransition { pub transition: Vec<(Address, TransitionAccount)>, /// Rewards to miner. - pub miner_update: LazyUpdateValue, + pub rewards: u128, } /// Utility function for parallel execution using fork-join pattern. diff --git a/src/partition.rs b/src/partition.rs index ac8e9c4..879e0b5 100644 --- a/src/partition.rs +++ b/src/partition.rs @@ -8,6 +8,7 @@ use revm::{ primitives::{Address, EVMError, Env, ResultAndState, SpecId, TxEnv, TxKind}, DatabaseRef, EvmBuilder, }; +use revm_primitives::db::Database; use std::{ collections::BTreeSet, sync::Arc, @@ -97,10 +98,11 @@ where /// validation process. pub(crate) fn execute(&mut self) { let start = Instant::now(); + let coinbase = self.env.block.coinbase; let mut evm = EvmBuilder::default() .with_db(&mut self.partition_db) .with_spec_id(self.spec_id) - .with_env(Box::new(self.env.clone())) + .with_env(Box::new(std::mem::take(&mut self.env))) .build(); #[allow(invalid_reference_casting)] @@ -112,6 +114,16 @@ where if let Some(tx) = self.txs.get(txid) { *evm.tx_mut() = tx.clone(); + let mut raw_transfer = true; + if let Ok(Some(info)) = evm.db_mut().basic(tx.caller) { + raw_transfer &= info.is_empty_code_hash(); + } + if let TxKind::Call(to) = tx.transact_to { + if let Ok(Some(info)) = evm.db_mut().basic(to) { + raw_transfer &= info.is_empty_code_hash(); + } + } + evm.db_mut().raw_transfer = raw_transfer; } else { panic!("Wrong transactions ID"); } @@ -129,13 +141,12 @@ where } } if should_execute { - let result = evm.transact(); + let result = evm.transact_lazy_reward(); match result { Ok(result_and_state) => { - let ResultAndState { result, mut state } = result_and_state; - let mut read_set = evm.db_mut().take_read_set(); - let (write_set, miner_update, remove_miner) = - evm.db().generate_write_set(&mut state); + let ResultAndState { result, mut state, rewards } = result_and_state; + let read_set = evm.db_mut().take_read_set(); + let write_set = evm.db().generate_write_set(&mut state); // Check if the transaction can be skipped // skip_validation=true does not necessarily mean the transaction can skip @@ -144,19 +155,12 @@ where // if a transaction with a smaller TxID conflicts, // the states of subsequent transactions are invalid. let mut skip_validation = + !matches!(read_set.get(&LocationAndType::Basic(coinbase)), Some(None)); + skip_validation &= read_set.iter().all(|l| tx_states[txid].read_set.contains_key(l.0)); skip_validation &= write_set.iter().all(|l| tx_states[txid].write_set.contains(l)); - if remove_miner { - // remove miner's state if we handle rewards separately - state.remove(&self.coinbase); - } else { - // add miner to read set, because it's in write set. - // set miner's value to None to make this tx redo in next round if - // unconfirmed. - read_set.insert(LocationAndType::Basic(self.coinbase), None); - } // temporary commit to cache_db, to make use the remaining txs can read the // updated data let transition = evm.db_mut().temporary_commit(state); @@ -171,7 +175,7 @@ where execute_result: ResultAndTransition { result: Some(result), transition, - miner_update, + rewards, }, }; } diff --git a/src/scheduler.rs b/src/scheduler.rs index 25f8d8f..4c3cc98 100644 --- a/src/scheduler.rs +++ b/src/scheduler.rs @@ -2,20 +2,13 @@ use crate::{ fork_join_util, hint::ParallelExecutionHints, partition::PartitionExecutor, - storage::{LazyUpdateValue, SchedulerDB, State}, + storage::{SchedulerDB, State}, tx_dependency::{DependentTxsVec, TxDependency}, GrevmError, LocationAndType, ResultAndTransition, TransactionStatus, TxId, CPU_CORES, GREVM_RUNTIME, MAX_NUM_ROUND, }; -use fastrace::Span; -use std::{ - collections::BTreeSet, - ops::DerefMut, - sync::{Arc, RwLock}, - time::{Duration, Instant}, -}; - use ahash::{AHashMap as HashMap, AHashSet as HashSet}; +use fastrace::Span; use metrics::{counter, gauge}; use revm::{ primitives::{ @@ -23,6 +16,15 @@ use revm::{ }, CacheState, DatabaseCommit, DatabaseRef, EvmBuilder, }; +use std::{ + collections::BTreeSet, + ops::DerefMut, + sync::{ + atomic::{AtomicUsize, Ordering}, + Arc, RwLock, + }, + time::{Duration, Instant}, +}; use tracing::info; struct ExecuteMetrics { @@ -355,7 +357,7 @@ where /// and there is no need to record the dependency and dependent relationships of these /// transactions. Thus achieving the purpose of pruning. #[fastrace::trace] - fn update_and_pruning_dependency(&mut self) { + fn update_and_pruning_dependency(&mut self, max_miner_involved_tx: TxId) { let num_finality_txs = self.num_finality_txs; if num_finality_txs == self.txs.len() { return; @@ -365,14 +367,20 @@ where for executor in &self.partition_executors { let executor = executor.read().unwrap(); for (txid, dep) in executor.assigned_txs.iter().zip(executor.tx_dependency.iter()) { - if *txid >= num_finality_txs { - // pruning the tx that is finality state - new_dependency[*txid - num_finality_txs] = dep - .clone() - .into_iter() - // pruning the dependent tx that is finality state - .filter(|dep_id| *dep_id >= num_finality_txs) - .collect(); + let txid = *txid; + if txid >= num_finality_txs && txid >= max_miner_involved_tx { + if txid == max_miner_involved_tx { + new_dependency[txid - num_finality_txs] = + (num_finality_txs..max_miner_involved_tx).collect(); + } else { + // pruning the tx that is finality state + new_dependency[txid - num_finality_txs] = dep + .clone() + .into_iter() + // pruning the dependent tx that is finality state + .filter(|dep_id| *dep_id >= num_finality_txs) + .collect(); + } } } } @@ -382,10 +390,13 @@ where /// Generate unconfirmed transactions, and find the continuous minimum TxID, /// which can be marked as finality transactions. #[fastrace::trace] - fn generate_unconfirmed_txs(&mut self) { + fn generate_unconfirmed_txs(&mut self) -> TxId { let num_partitions = self.num_partitions; let (end_skip_id, merged_write_set) = self.merge_write_set(); self.metrics.skip_validation_cnt.increment((end_skip_id - self.num_finality_txs) as u64); + let miner_location = LocationAndType::Basic(self.coinbase); + let min_tx_id = self.num_finality_txs; + let max_miner_involved_tx = AtomicUsize::new(min_tx_id); fork_join_util(num_partitions, Some(num_partitions), |_, _, part| { // Transaction validation process: // 1. For each transaction in each partition, traverse its read set and find the largest @@ -401,12 +412,18 @@ where let tx_states = unsafe { &mut *(&(*self.tx_states) as *const Vec as *mut Vec) }; - for txid in executor.assigned_txs.iter() { + for (index, txid) in executor.assigned_txs.iter().enumerate() { let txid = *txid; let mut conflict = tx_states[txid].tx_status == TransactionStatus::Conflict; let mut updated_dependencies = BTreeSet::new(); if txid >= end_skip_id { - for (location, _) in tx_states[txid].read_set.iter() { + for (location, balance) in tx_states[txid].read_set.iter() { + if *location == miner_location { + if balance.is_none() && txid - min_tx_id != index { + conflict = true; + max_miner_involved_tx.fetch_max(txid, Ordering::Relaxed); + } + } if let Some(written_txs) = merged_write_set.get(location) { if let Some(previous_txid) = written_txs.range(..txid).next_back() { // update dependencies: previous_txid <- txid @@ -430,6 +447,7 @@ where } } }); + max_miner_involved_tx.load(Ordering::Acquire) } /// Find the continuous minimum TxID, which can be marked as finality transactions. @@ -514,12 +532,12 @@ where #[allow(invalid_reference_casting)] let tx_states = unsafe { &mut *(&(*self.tx_states) as *const Vec as *mut Vec) }; - let mut miner_updates = Vec::with_capacity(finality_tx_cnt); let start_txid = self.num_finality_txs - finality_tx_cnt; let span = Span::enter_with_local_parent("database commit transitions"); + let mut rewards = 0; for txid in start_txid..self.num_finality_txs { - miner_updates.push(tx_states[txid].execute_result.miner_update.clone()); + rewards += tx_states[txid].execute_result.rewards; database .commit_transition(std::mem::take(&mut tx_states[txid].execute_result.transition)); self.results.push(tx_states[txid].execute_result.result.clone().unwrap()); @@ -532,7 +550,7 @@ where // set, and track the rewards for the miner for each transaction separately. // The miner’s account is only updated after validation by SchedulerDB.increment_balances database - .update_balances(vec![(self.coinbase, LazyUpdateValue::merge(miner_updates))]) + .increment_balances(vec![(self.coinbase, rewards)]) .map_err(|err| GrevmError::EvmError(EVMError::Database(err)))?; self.metrics.commit_transition_time.increment(start.elapsed().as_nanos() as u64); Ok(()) @@ -544,10 +562,10 @@ where #[fastrace::trace] fn validate_transactions(&mut self) -> Result<(), GrevmError> { let start = Instant::now(); - self.generate_unconfirmed_txs(); + let max_miner_involved_tx = self.generate_unconfirmed_txs(); let finality_tx_cnt = self.find_continuous_min_txid()?; // update and pruning tx dependencies - self.update_and_pruning_dependency(); + self.update_and_pruning_dependency(max_miner_involved_tx); self.commit_transition(finality_tx_cnt)?; self.metrics.validate_time.increment(start.elapsed().as_nanos() as u64); Ok(()) diff --git a/src/storage.rs b/src/storage.rs index 1abf47a..ea9c76d 100644 --- a/src/storage.rs +++ b/src/storage.rs @@ -18,61 +18,6 @@ use std::{ }, }; -/// LazyUpdateValue is used to update the balance of the miner's account. -/// The miner's reward is calculated by subtracting the previous balance from the current balance. -#[derive(Debug, Clone)] -pub(crate) enum LazyUpdateValue { - Increase(u128), - Decrease(u128), -} - -impl Default for LazyUpdateValue { - fn default() -> Self { - Self::Increase(0) - } -} - -/// Merge multiple LazyUpdateValue into one. -impl LazyUpdateValue { - pub(crate) fn merge(values: Vec) -> Self { - let mut value: u128 = 0; - let mut positive: bool = true; - for lazy_value in values { - match lazy_value { - Self::Increase(inc) => { - if positive { - value += inc; - } else { - if value > inc { - value -= inc - } else { - value = inc - value; - positive = true; - } - } - } - Self::Decrease(dec) => { - if positive { - if value > dec { - value -= dec; - } else { - value = dec - value; - positive = false; - } - } else { - value += dec; - } - } - } - } - if positive { - Self::Increase(value) - } else { - Self::Decrease(value) - } - } -} - trait ParallelBundleState { fn parallel_apply_transitions_and_create_reverts( &mut self, @@ -158,6 +103,7 @@ impl ParallelBundleState for BundleState { } } +/// State that manages state transitions and caching #[derive(Debug)] pub struct State { /// Cache the committed data of finality txns and the read-only data during execution after @@ -272,26 +218,24 @@ where } } - /// The miner's reward is calculated by subtracting the previous balance from the current + /// The miner's rewards is calculated by subtracting the previous balance from the current /// balance. and should add to the miner's account after each round of execution for /// finality transactions. - pub(crate) fn update_balances( + pub(crate) fn increment_balances( &mut self, - balances: impl IntoIterator, + balances: impl IntoIterator, ) -> Result<(), DB::Error> { // make transition and update cache state let mut transitions = Vec::new(); - for (address, update) in balances { - let cache_account = self.load_cache_account(address)?; - let mut info = cache_account.account_info().unwrap_or_default(); - let new_balance = match update { - LazyUpdateValue::Increase(value) => info.balance.saturating_add(U256::from(value)), - LazyUpdateValue::Decrease(value) => info.balance.saturating_sub(U256::from(value)), - }; - if info.balance != new_balance { - info.balance = new_balance; - transitions.push((address, cache_account.change(info, Default::default()))); + for (address, balance) in balances { + if balance == 0 { + continue; } + let original_account = self.load_cache_account(address)?; + transitions.push(( + address, + original_account.increment_balance(balance).expect("Balance is not zero"), + )) } // append transition if let Some(s) = self.state.transition_state.as_mut() { @@ -462,6 +406,8 @@ pub(crate) struct PartitionDB { /// Record the read set of current tx, will be consumed after the execution of each tx tx_read_set: AHashMap>, + + pub raw_transfer: bool, } impl PartitionDB { @@ -472,6 +418,7 @@ impl PartitionDB { scheduler_db, block_hashes: BTreeMap::new(), tx_read_set: AHashMap::new(), + raw_transfer: false, } } @@ -483,12 +430,7 @@ impl PartitionDB { /// Generate the write set after evm.transact() for each tx /// The write set includes the locations of the basic account, code, and storage slots that have /// been modified. Returns the write set(exclude miner) and the miner's rewards. - pub(crate) fn generate_write_set( - &self, - changes: &mut EvmState, - ) -> (LocationSet, LazyUpdateValue, bool) { - let mut miner_update = LazyUpdateValue::default(); - let mut remove_miner = true; + pub(crate) fn generate_write_set(&self, changes: &mut EvmState) -> LocationSet { let mut write_set = AHashSet::new(); for (address, account) in &mut *changes { if account.is_selfdestructed() { @@ -502,37 +444,6 @@ impl PartitionDB { continue; } - // Lazy update miner's balance - let mut miner_updated = false; - if self.coinbase == *address { - let add_nonce = match self.cache.accounts.get(address) { - Some(miner) => match miner.account.as_ref() { - Some(miner) => { - if account.info.balance >= miner.info.balance { - miner_update = LazyUpdateValue::Increase( - (account.info.balance - miner.info.balance).to(), - ); - } else { - miner_update = LazyUpdateValue::Decrease( - (miner.info.balance - account.info.balance).to(), - ); - } - account.info.balance = miner.info.balance; - account.info.nonce - miner.info.nonce - } - // LoadedNotExisting - None => { - miner_update = LazyUpdateValue::Increase(account.info.balance.to()); - account.info.balance = U256::ZERO; - account.info.nonce - } - }, - None => panic!("Miner should be cached"), - }; - miner_updated = true; - remove_miner = add_nonce == 0 && account.changed_storage_slots().count() == 0; - } - // If the account is touched, it means that the account's state has been modified // during the transaction. This includes changes to the account's balance, nonce, // or code. We need to track these changes to ensure the correct state is committed @@ -556,8 +467,7 @@ impl PartitionDB { new_contract_account = has_code; true } - } && !miner_updated - { + } { write_set.insert(LocationAndType::Basic(*address)); } if new_contract_account { @@ -569,7 +479,7 @@ impl PartitionDB { write_set.insert(LocationAndType::Storage(*address, *slot)); } } - (write_set, miner_update, remove_miner) + write_set } /// Temporary commit the state change after evm.transact() for each tx @@ -663,8 +573,10 @@ where balance = Some(info.balance); } } - if address != self.coinbase { - self.tx_read_set.insert(LocationAndType::Basic(address), balance); + if address == self.coinbase && !self.raw_transfer { + self.tx_read_set.entry(LocationAndType::Basic(address)).or_insert(None); + } else { + self.tx_read_set.entry(LocationAndType::Basic(address)).or_insert(balance); } result } @@ -696,7 +608,7 @@ where if let Ok(value) = &result { slot_value = Some(value.clone()); } - self.tx_read_set.insert(LocationAndType::Storage(address, index), slot_value); + self.tx_read_set.entry(LocationAndType::Storage(address, index)).or_insert(slot_value); result } diff --git a/tests/native_transfers.rs b/tests/native_transfers.rs index 5f8dcab..dd03da1 100644 --- a/tests/native_transfers.rs +++ b/tests/native_transfers.rs @@ -348,3 +348,82 @@ fn native_transfer_with_beneficiary() { .collect(), ); } + +#[test] +fn native_transfer_with_beneficiary_enough() { + let block_size = 20; // number of transactions + let accounts = common::mock_block_accounts(START_ADDRESS, block_size); + let db = InMemoryDB::new(accounts, Default::default(), Default::default()); + let mut txs: Vec = (START_ADDRESS..START_ADDRESS + block_size) + .map(|i| { + let address = Address::from(U160::from(i)); + TxEnv { + caller: address, + transact_to: TransactTo::Call(address), + value: U256::from(100), + gas_limit: common::TRANSFER_GAS_LIMIT, + gas_price: U256::from(1), + nonce: None, + ..TxEnv::default() + } + }) + .collect(); + let start_address = Address::from(U160::from(START_ADDRESS)); + let miner_address = Address::from(U160::from(MINER_ADDRESS)); + // start => miner + txs.push(TxEnv { + caller: start_address, + transact_to: TransactTo::Call(miner_address), + value: U256::from(100000), + gas_limit: common::TRANSFER_GAS_LIMIT, + gas_price: U256::from(1), + nonce: Some(2), + ..TxEnv::default() + }); + // miner => start + txs.push(TxEnv { + caller: miner_address, + transact_to: TransactTo::Call(start_address), + value: U256::from(1), + gas_limit: common::TRANSFER_GAS_LIMIT, + gas_price: U256::from(1), + nonce: Some(1), + ..TxEnv::default() + }); + // miner => start + txs.push(TxEnv { + caller: miner_address, + transact_to: TransactTo::Call(start_address), + value: U256::from(1), + gas_limit: common::TRANSFER_GAS_LIMIT, + gas_price: U256::from(1), + nonce: Some(2), + ..TxEnv::default() + }); + // miner => miner + txs.push(TxEnv { + caller: miner_address, + transact_to: TransactTo::Call(miner_address), + value: U256::from(1), + gas_limit: common::TRANSFER_GAS_LIMIT, + gas_price: U256::from(1), + nonce: Some(3), + ..TxEnv::default() + }); + common::compare_evm_execute( + db, + txs, + true, + [ + ("grevm.parallel_round_calls", DebugValue::Counter(1)), + ("grevm.sequential_execute_calls", DebugValue::Counter(0)), + ("grevm.parallel_tx_cnt", DebugValue::Counter(24 as u64)), + ("grevm.conflict_tx_cnt", DebugValue::Counter(0)), + ("grevm.unconfirmed_tx_cnt", DebugValue::Counter(0)), + ("grevm.reusable_tx_cnt", DebugValue::Counter(0)), + ("grevm.skip_validation_cnt", DebugValue::Counter(24)), + ] + .into_iter() + .collect(), + ); +}