diff --git a/src/partition.rs b/src/partition.rs index 879e0b5..39f1f84 100644 --- a/src/partition.rs +++ b/src/partition.rs @@ -14,6 +14,8 @@ use std::{ sync::Arc, time::{Duration, Instant}, }; +use std::sync::atomic::Ordering; +use crate::scheduler::{RewardsAccumulator, RewardsAccumulators}; #[derive(Default)] pub(crate) struct PartitionMetrics { @@ -52,6 +54,8 @@ where tx_states: SharedTxStates, txs: Arc>, + rewards_accumulators: Arc, + pub partition_db: PartitionDB, pub assigned_txs: Vec, @@ -70,13 +74,14 @@ where spec_id: SpecId, partition_id: PartitionId, env: Env, + rewards_accumulators: Arc, scheduler_db: Arc>, txs: Arc>, tx_states: SharedTxStates, assigned_txs: Vec, ) -> Self { let coinbase = env.block.coinbase; - let partition_db = PartitionDB::new(coinbase, scheduler_db); + let partition_db = PartitionDB::new(coinbase, scheduler_db, rewards_accumulators.clone()); Self { spec_id, env, @@ -84,6 +89,7 @@ where partition_id, tx_states, txs, + rewards_accumulators, partition_db, assigned_txs, error_txs: HashMap::new(), @@ -114,6 +120,7 @@ where if let Some(tx) = self.txs.get(txid) { *evm.tx_mut() = tx.clone(); + evm.db_mut().current_txid = txid; let mut raw_transfer = true; if let Ok(Some(info)) = evm.db_mut().basic(tx.caller) { raw_transfer &= info.is_empty_code_hash(); @@ -124,11 +131,13 @@ where } } evm.db_mut().raw_transfer = raw_transfer; + evm.db_mut().take_read_set(); // clean read set } else { panic!("Wrong transactions ID"); } // If the transaction is unconfirmed, it may not require repeated execution let mut should_execute = true; + let mut update_rewards = 0; if tx_states[txid].tx_status == TransactionStatus::Unconfirmed { if evm.db_mut().check_read_set(&tx_states[txid].read_set) { // Unconfirmed transactions from the previous round might not need to be @@ -137,6 +146,7 @@ where evm.db_mut().temporary_commit_transition(transition); should_execute = false; self.metrics.reusable_tx_cnt += 1; + update_rewards = tx_states[txid].execute_result.rewards; tx_states[txid].tx_status = TransactionStatus::SkipValidation; } } @@ -178,6 +188,7 @@ where rewards, }, }; + update_rewards = rewards; } Err(err) => { // In a parallel execution environment, transactions might fail due to @@ -192,10 +203,10 @@ where let mut read_set = evm.db_mut().take_read_set(); // update write set with the caller and transact_to let mut write_set = HashSet::new(); - read_set.insert(LocationAndType::Basic(evm.tx().caller), None); + read_set.entry(LocationAndType::Basic(evm.tx().caller)).or_insert(None); write_set.insert(LocationAndType::Basic(evm.tx().caller)); if let TxKind::Call(to) = evm.tx().transact_to { - read_set.insert(LocationAndType::Basic(to), None); + read_set.entry(LocationAndType::Basic(to)).or_insert(None); write_set.insert(LocationAndType::Basic(to)); } @@ -209,7 +220,25 @@ where } } } + if !self.rewards_accumulators.is_empty() { + Self::report_rewards(self.rewards_accumulators.clone(), txid, update_rewards); + } } self.metrics.execute_time = start.elapsed(); } + + fn report_rewards(rewards_accumulators: Arc, txid: TxId, rewards: u128) { + if !rewards_accumulators.is_empty() { + for (_, accumulator) in rewards_accumulators.range((txid + 1)..).next() { + let counter = accumulator.accumulate_counter.fetch_add(1, Ordering::Release); + accumulator.accumulate_rewards.fetch_add(rewards, Ordering::Release); + if counter >= accumulator.accumulate_num { + panic!("to many reward records!"); + } + if counter == accumulator.accumulate_num - 1 { + accumulator.notifier.notify_one(); + } + } + } + } } diff --git a/src/scheduler.rs b/src/scheduler.rs index d131879..5014867 100644 --- a/src/scheduler.rs +++ b/src/scheduler.rs @@ -25,6 +25,12 @@ use std::{ }, time::{Duration, Instant}, }; +use std::collections::BTreeMap; +use std::sync::Mutex; +use atomic::Atomic; +use dashmap::DashSet; +use revm::interpreter::instructions::host::balance; +use tokio::sync::Notify; use tracing::info; struct ExecuteMetrics { @@ -132,6 +138,26 @@ impl TxState { } } +pub(crate) struct RewardsAccumulator { + pub accumulate_num: usize, + pub accumulate_counter: AtomicUsize, + pub accumulate_rewards: Atomic, + pub notifier: Arc, +} + +impl RewardsAccumulator { + pub(crate) fn new(accumulate_num: usize) -> Self { + Self { + accumulate_num, + accumulate_counter: AtomicUsize::new(0), + accumulate_rewards: Atomic::::new(0), + notifier: Arc::new(Notify::new()), + } + } +} + +pub(crate) type RewardsAccumulators = BTreeMap; + /// A shared reference to a vector of transaction states. /// Used to share the transaction states between the partition executors. /// Since the state of a transaction is not modified by multiple threads simultaneously, @@ -176,6 +202,8 @@ where num_finality_txs: usize, results: Vec, + rewards_accumulators: Arc, + metrics: ExecuteMetrics, } @@ -262,6 +290,7 @@ where partition_executors: vec![], num_finality_txs: 0, results: Vec::with_capacity(num_txs), + rewards_accumulators: Arc::new(RewardsAccumulators::new()), metrics: Default::default(), } } @@ -298,6 +327,7 @@ where self.spec_id, partition_id, self.env.clone(), + self.rewards_accumulators.clone(), self.database.clone(), self.txs.clone(), self.tx_states.clone(), @@ -357,7 +387,7 @@ where /// and there is no need to record the dependency and dependent relationships of these /// transactions. Thus achieving the purpose of pruning. #[fastrace::trace] - fn update_and_pruning_dependency(&mut self, max_miner_involved_tx: TxId) { + fn update_and_pruning_dependency(&mut self) { let num_finality_txs = self.num_finality_txs; if num_finality_txs == self.txs.len() { return; @@ -368,19 +398,14 @@ where let executor = executor.read().unwrap(); for (txid, dep) in executor.assigned_txs.iter().zip(executor.tx_dependency.iter()) { let txid = *txid; - if txid >= num_finality_txs && txid >= max_miner_involved_tx { - if txid == max_miner_involved_tx { - new_dependency[txid - num_finality_txs] = - (num_finality_txs..max_miner_involved_tx).collect(); - } else { - // pruning the tx that is finality state - new_dependency[txid - num_finality_txs] = dep - .clone() - .into_iter() - // pruning the dependent tx that is finality state - .filter(|dep_id| *dep_id >= num_finality_txs) - .collect(); - } + if txid >= num_finality_txs{ + // pruning the tx that is finality state + new_dependency[txid - num_finality_txs] = dep + .clone() + .into_iter() + // pruning the dependent tx that is finality state + .filter(|dep_id| *dep_id >= num_finality_txs) + .collect(); } } } @@ -390,13 +415,12 @@ where /// Generate unconfirmed transactions, and find the continuous minimum TxID, /// which can be marked as finality transactions. #[fastrace::trace] - fn generate_unconfirmed_txs(&mut self) -> TxId { + fn generate_unconfirmed_txs(&mut self) -> Vec { let num_partitions = self.num_partitions; let (end_skip_id, merged_write_set) = self.merge_write_set(); self.metrics.skip_validation_cnt.increment((end_skip_id - self.num_finality_txs) as u64); let miner_location = LocationAndType::Basic(self.coinbase); - let min_tx_id = self.num_finality_txs; - let max_miner_involved_tx = AtomicUsize::new(min_tx_id); + let miner_involved_txs = DashSet::new(); fork_join_util(num_partitions, Some(num_partitions), |_, _, part| { // Transaction validation process: // 1. For each transaction in each partition, traverse its read set and find the largest @@ -412,17 +436,17 @@ where let tx_states = unsafe { &mut *(&(*self.tx_states) as *const Vec as *mut Vec) }; - for (index, txid) in executor.assigned_txs.iter().enumerate() { + for txid in executor.assigned_txs.iter() { let txid = *txid; let mut conflict = tx_states[txid].tx_status == TransactionStatus::Conflict; let mut updated_dependencies = BTreeSet::new(); if txid >= end_skip_id { for (location, balance) in tx_states[txid].read_set.iter() { - if *location == miner_location { - if balance.is_none() && txid - min_tx_id != index { + if *location == miner_location && balance.is_none(){ + if txid != self.num_finality_txs && !self.rewards_accumulators.contains_key(&txid) { conflict = true; - max_miner_involved_tx.fetch_max(txid, Ordering::Relaxed); } + miner_involved_txs.insert(txid); } if let Some(written_txs) = merged_write_set.get(location) { if let Some(previous_txid) = written_txs.range(..txid).next_back() { @@ -447,7 +471,7 @@ where } } }); - max_miner_involved_tx.load(Ordering::Acquire) + miner_involved_txs.into_iter().collect() } /// Find the continuous minimum TxID, which can be marked as finality transactions. @@ -536,8 +560,14 @@ where let span = Span::enter_with_local_parent("database commit transitions"); let mut rewards = 0; + let rewards_start_txid = match self.rewards_accumulators.range(..self.num_finality_txs).next_back() { + Some((txid, _)) => *txid, + None => start_txid, + }; for txid in start_txid..self.num_finality_txs { - rewards += tx_states[txid].execute_result.rewards; + if txid >= rewards_start_txid { + rewards += tx_states[txid].execute_result.rewards; + } database .commit_transition(std::mem::take(&mut tx_states[txid].execute_result.transition)); self.results.push(tx_states[txid].execute_result.result.clone().unwrap()); @@ -562,11 +592,18 @@ where #[fastrace::trace] fn validate_transactions(&mut self) -> Result<(), GrevmError> { let start = Instant::now(); - let max_miner_involved_tx = self.generate_unconfirmed_txs(); + let miner_involved_txs = self.generate_unconfirmed_txs(); let finality_tx_cnt = self.find_continuous_min_txid()?; // update and pruning tx dependencies - self.update_and_pruning_dependency(max_miner_involved_tx); + self.update_and_pruning_dependency(); self.commit_transition(finality_tx_cnt)?; + let mut rewards_accumulators = RewardsAccumulators::new(); + for txid in miner_involved_txs { + if txid > self.num_finality_txs { + rewards_accumulators.insert(txid, RewardsAccumulator::new(txid - self.num_finality_txs)); + } + } + self.rewards_accumulators = Arc::new(rewards_accumulators); self.metrics.validate_time.increment(start.elapsed().as_nanos() as u64); Ok(()) } diff --git a/src/storage.rs b/src/storage.rs index c0e9052..d71b8dc 100644 --- a/src/storage.rs +++ b/src/storage.rs @@ -1,4 +1,4 @@ -use crate::{fork_join_util, LocationAndType, LocationSet}; +use crate::{fork_join_util, GREVM_RUNTIME, LocationAndType, LocationSet, TxId}; use ahash::{AHashMap, AHashSet}; use fastrace::Span; use revm::{ @@ -17,6 +17,8 @@ use std::{ Arc, Mutex, }, }; +use crate::scheduler::RewardsAccumulators; +use tokio::time::{timeout, Duration}; trait ParallelBundleState { fn parallel_apply_transitions_and_create_reverts( @@ -412,18 +414,24 @@ pub(crate) struct PartitionDB { /// Record the read set of current tx, will be consumed after the execution of each tx tx_read_set: AHashMap>, + pub current_txid: TxId, pub raw_transfer: bool, + rewards_accumulators: Arc, + accumulated_rewards: u128, } impl PartitionDB { - pub(crate) fn new(coinbase: Address, scheduler_db: Arc>) -> Self { + pub(crate) fn new(coinbase: Address, scheduler_db: Arc>, rewards_accumulators: Arc) -> Self { Self { coinbase, cache: CacheState::new(scheduler_db.state.cache.has_state_clear), scheduler_db, block_hashes: BTreeMap::new(), tx_read_set: AHashMap::new(), + current_txid: 0, raw_transfer: false, + rewards_accumulators, + accumulated_rewards: 0, } } @@ -556,7 +564,7 @@ where fn basic(&mut self, address: Address) -> Result, Self::Error> { // 1. read from internal cache - let result = match self.cache.accounts.entry(address) { + let mut result = match self.cache.accounts.entry(address) { hash_map::Entry::Vacant(entry) => { // 2. read initial state of this round from scheduler cache if let Some(account) = self.scheduler_db.state.cache.accounts.get(&address) { @@ -569,19 +577,44 @@ where } hash_map::Entry::Occupied(entry) => Ok(entry.get().account_info()), }; - let mut balance = None; - if let Ok(account) = &result { + + let mut inc_rewards = 0; + if address == self.coinbase && !self.raw_transfer { + if let Some(accumulator) = self.rewards_accumulators.get(&self.current_txid) { + let mut wait_time_ms = 0; + let loop_wait_time = 10; // 10ms + let wait_time_out = 10_1000; // 10s + while accumulator.accumulate_counter.load(Ordering::Acquire) < accumulator.accumulate_num { + let notifier = accumulator.notifier.clone(); + tokio::task::block_in_place(move || { + GREVM_RUNTIME.block_on(async move { + if !timeout(Duration::from_millis(loop_wait_time), notifier.notified()).await.is_ok() { + wait_time_ms += loop_wait_time; + } + }) + }); + if wait_time_ms > wait_time_out { + panic!("wait to much time for the accumulated rewards of account({:?})", address); + } + } + let new_rewards = accumulator.accumulate_rewards.load(Ordering::Acquire); + inc_rewards = new_rewards - self.accumulated_rewards; + self.accumulated_rewards = new_rewards; + } + self.tx_read_set.entry(LocationAndType::Basic(address)).or_insert(None); + } + let mut balance = U256::ZERO; + if let Ok(account) = &mut result { if let Some(info) = account { if !info.is_empty_code_hash() { self.tx_read_set.insert(LocationAndType::Code(address), None); } - balance = Some(info.balance); + info.balance = info.balance.saturating_add(U256::from(inc_rewards)); + balance = info.balance; } } - if address == self.coinbase && !self.raw_transfer { - self.tx_read_set.entry(LocationAndType::Basic(address)).or_insert(None); - } else { - self.tx_read_set.entry(LocationAndType::Basic(address)).or_insert(balance); + if address != self.coinbase || self.raw_transfer { + self.tx_read_set.entry(LocationAndType::Basic(address)).or_insert(Some(balance)); } result }