Skip to content

Commit

Permalink
opt: async commit transitions and update caches
Browse files Browse the repository at this point in the history
  • Loading branch information
AshinGau committed Oct 31, 2024
1 parent ef3f2cb commit fbc54d0
Show file tree
Hide file tree
Showing 3 changed files with 161 additions and 70 deletions.
9 changes: 8 additions & 1 deletion src/partition.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ use revm::{
};
use std::{
collections::BTreeSet,
sync::Arc,
sync::{mpsc::Sender, Arc},
time::{Duration, Instant},
};

Expand Down Expand Up @@ -50,6 +50,7 @@ where
/// allowing modification of transaction states during execution
tx_states: SharedTxStates,
txs: Arc<Vec<TxEnv>>,
pre_commit_sender: Sender<TxId>,

pub partition_db: PartitionDB<DB>,
pub assigned_txs: Vec<TxId>,
Expand All @@ -73,6 +74,7 @@ where
txs: Arc<Vec<TxEnv>>,
tx_states: SharedTxStates,
assigned_txs: Vec<TxId>,
pre_commit_sender: Sender<TxId>,
) -> Self {
let coinbase = env.block.coinbase;
let partition_db = PartitionDB::new(coinbase, scheduler_db);
Expand All @@ -85,6 +87,7 @@ where
txs,
partition_db,
assigned_txs,
pre_commit_sender,
error_txs: HashMap::new(),
tx_dependency: vec![],
metrics: Default::default(),
Expand Down Expand Up @@ -126,6 +129,7 @@ where
should_execute = false;
self.metrics.reusable_tx_cnt += 1;
tx_states[txid].tx_status = TransactionStatus::SkipValidation;
self.pre_commit_sender.send(txid).unwrap();
}
}
if should_execute {
Expand Down Expand Up @@ -174,6 +178,9 @@ where
miner_update,
},
};
if skip_validation {
self.pre_commit_sender.send(txid).unwrap();
}
}
Err(err) => {
// In a parallel execution environment, transactions might fail due to
Expand Down
136 changes: 101 additions & 35 deletions src/scheduler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,15 +7,8 @@ use crate::{
GrevmError, LocationAndType, ResultAndTransition, TransactionStatus, TxId, CPU_CORES,
GREVM_RUNTIME, MAX_NUM_ROUND,
};
use fastrace::Span;
use std::{
collections::BTreeSet,
ops::DerefMut,
sync::{Arc, RwLock},
time::{Duration, Instant},
};

use ahash::{AHashMap as HashMap, AHashSet as HashSet};
use fastrace::Span;
use metrics::{counter, gauge};
use revm::{
db::{states::bundle_state::BundleRetention, BundleState},
Expand All @@ -24,6 +17,18 @@ use revm::{
},
CacheState, DatabaseRef, EvmBuilder,
};
use std::{
cmp::Reverse,
collections::{BTreeSet, BinaryHeap},
ops::DerefMut,
sync::{
mpsc::{channel, Receiver, Sender},
Arc, Mutex, RwLock,
},
thread,
time::{Duration, Instant},
};
use tokio::task::JoinHandle;
use tracing::info;

struct ExecuteMetrics {
Expand Down Expand Up @@ -178,7 +183,7 @@ where
partition_executors: Vec<Arc<RwLock<PartitionExecutor<DB>>>>,
/// number of finality txs in the current round
num_finality_txs: usize,
results: Vec<ExecutionResult>,
results: Arc<Mutex<Vec<ExecutionResult>>>,

metrics: ExecuteMetrics,
}
Expand Down Expand Up @@ -257,11 +262,50 @@ where
partitioned_txs: vec![],
partition_executors: vec![],
num_finality_txs: 0,
results: Vec::with_capacity(num_txs),
results: Arc::new(Mutex::new(Vec::with_capacity(num_txs))),
metrics: Default::default(),
}
}

fn async_commit(
database: Arc<SchedulerDB<DB>>,
tx_states: SharedTxStates,
results: Arc<Mutex<Vec<ExecutionResult>>>,
num_finality_txs: TxId,
rx: Receiver<TxId>,
) -> JoinHandle<(TxId, LazyUpdateValue)> {
GREVM_RUNTIME.spawn(async move {
#[allow(invalid_reference_casting)]
let database =
unsafe { &mut *(&(*database) as *const SchedulerDB<DB> as *mut SchedulerDB<DB>) };
#[allow(invalid_reference_casting)]
let tx_states =
unsafe { &mut *(&(*tx_states) as *const Vec<TxState> as *mut Vec<TxState>) };
let mut results = results.lock().unwrap();
let mut update_miner = LazyUpdateValue::default();

let eof_signal = tx_states.len();
let mut commit_txs = vec![false; eof_signal - num_finality_txs];
let mut index = num_finality_txs;
while let Ok(txid) = rx.recv() {
if txid == eof_signal {
break;
}
commit_txs[txid - num_finality_txs] = true;
while index < eof_signal && commit_txs[index - num_finality_txs] {
database.commit_transition(std::mem::take(
&mut tx_states[index].execute_result.transition,
));
results.push(tx_states[index].execute_result.result.clone().unwrap());
update_miner =
update_miner.merge(tx_states[index].execute_result.miner_update.clone());
index += 1;
}
}
(index, update_miner)
})
}

/// Get the partitioned transactions by dependencies.
#[fastrace::trace]
pub(crate) fn partition_transactions(&mut self) {
Expand Down Expand Up @@ -289,6 +333,7 @@ where
fn round_execute(&mut self) -> Result<(), GrevmError<DB::Error>> {
self.metrics.parallel_execute_calls.increment(1);
self.partition_executors.clear();
let (tx, rx) = channel();
for partition_id in 0..self.num_partitions {
let executor = PartitionExecutor::new(
self.spec_id,
Expand All @@ -298,11 +343,19 @@ where
self.txs.clone(),
self.tx_states.clone(),
self.partitioned_txs[partition_id].clone(),
tx.clone(),
);
self.partition_executors.push(Arc::new(RwLock::new(executor)));
}

let start = Instant::now();
let commit_handle = Self::async_commit(
self.database.clone(),
self.tx_states.clone(),
self.results.clone(),
self.num_finality_txs,
rx,
);
GREVM_RUNTIME.block_on(async {
let mut tasks = vec![];
for executor in &self.partition_executors {
Expand All @@ -313,7 +366,7 @@ where
});
self.metrics.parallel_execute_time.increment(start.elapsed().as_nanos() as u64);

self.validate_transactions()
self.validate_transactions(tx, commit_handle)
}

/// Merge write set after each round
Expand Down Expand Up @@ -375,7 +428,7 @@ where
/// Generate unconfirmed transactions, and find the continuous minimum TxID,
/// which can be marked as finality transactions.
#[fastrace::trace]
fn generate_unconfirmed_txs(&mut self) {
fn generate_unconfirmed_txs(&mut self, commit_sender: Sender<TxId>) {
let num_partitions = self.num_partitions;
let (end_skip_id, merged_write_set) = self.merge_write_set();
self.metrics.skip_validation_cnt.increment((end_skip_id - self.num_finality_txs) as u64);
Expand Down Expand Up @@ -414,6 +467,11 @@ where
}
}
}
if !conflict && tx_states[txid].tx_status != TransactionStatus::SkipValidation {
// when tx_status == SkipValidation, txid has ready been sent by
// PartitionExecutor
commit_sender.send(txid).unwrap();
}
}
executor.tx_dependency.push(updated_dependencies);
tx_states[txid].tx_status = if conflict {
Expand All @@ -423,6 +481,9 @@ where
}
}
});
// send `self.tx_states.len()` as the EOF signal
commit_sender.send(self.tx_states.len()).unwrap();
drop(commit_sender);
}

/// Find the continuous minimum TxID, which can be marked as finality transactions.
Expand Down Expand Up @@ -485,7 +546,11 @@ where

/// Commit the transition of the finality transactions, and update the minner's rewards.
#[fastrace::trace]
fn commit_transition(&mut self, finality_tx_cnt: usize) -> Result<(), GrevmError<DB::Error>> {
fn commit_transition(
&mut self,
finality_tx_cnt: usize,
commit_handle: JoinHandle<(TxId, LazyUpdateValue)>,
) -> Result<(), GrevmError<DB::Error>> {
let start = Instant::now();
let partition_state: Vec<CacheState> = self
.partition_executors
Expand All @@ -498,34 +563,29 @@ where

// MUST drop the `PartitionExecutor::scheduler_db` before get mut
self.partition_executors.clear();

let span = Span::enter_with_local_parent("async commit transitions");
let (async_commit_id, update_miner) =
GREVM_RUNTIME.block_on(async { commit_handle.await.unwrap() });
assert_eq!(async_commit_id, self.num_finality_txs);
drop(span);

// there's a database reference in commit_handle,
// so we should wait the end of the async commit operation.
let database = Arc::get_mut(&mut self.database).unwrap();
database.merge_pre_commit();
if self.num_finality_txs < self.txs.len() {
// Merging these states is only useful when there is a next round of execution.
Self::merge_not_modified_state(&mut database.cache, partition_state);
}

#[allow(invalid_reference_casting)]
let tx_states =
unsafe { &mut *(&(*self.tx_states) as *const Vec<TxState> as *mut Vec<TxState>) };
let mut miner_updates = Vec::with_capacity(finality_tx_cnt);
let start_txid = self.num_finality_txs - finality_tx_cnt;

let span = Span::enter_with_local_parent("database commit transitions");
for txid in start_txid..self.num_finality_txs {
miner_updates.push(tx_states[txid].execute_result.miner_update.clone());
database
.commit_transition(std::mem::take(&mut tx_states[txid].execute_result.transition));
self.results.push(tx_states[txid].execute_result.result.clone().unwrap());
}
drop(span);

// Each transaction updates three accounts: from, to, and coinbase.
// If every tx updates the coinbase account, it will cause conflicts across all txs.
// Therefore, we handle miner rewards separately. We don't record miner’s address in r/w
// set, and track the rewards for the miner for each transaction separately.
// The miner’s account is only updated after validation by SchedulerDB.increment_balances
database
.update_balances(vec![(self.coinbase, LazyUpdateValue::merge(miner_updates))])
.update_balances(vec![(self.coinbase, update_miner)])
.map_err(|err| GrevmError::EvmError(EVMError::Database(err)))?;
self.metrics.commit_transition_time.increment(start.elapsed().as_nanos() as u64);
Ok(())
Expand All @@ -535,13 +595,17 @@ where
/// Because after each round execution, the read-write set is no longer updated.
/// We can check in parallel whether the read set is out of bounds.
#[fastrace::trace]
fn validate_transactions(&mut self) -> Result<(), GrevmError<DB::Error>> {
fn validate_transactions(
&mut self,
commit_sender: Sender<TxId>,
commit_handle: JoinHandle<(TxId, LazyUpdateValue)>,
) -> Result<(), GrevmError<DB::Error>> {
let start = Instant::now();
self.generate_unconfirmed_txs();
self.generate_unconfirmed_txs(commit_sender);
let finality_tx_cnt = self.find_continuous_min_txid()?;
// update and pruning tx dependencies
self.update_and_pruning_dependency();
self.commit_transition(finality_tx_cnt)?;
self.commit_transition(finality_tx_cnt, commit_handle)?;
self.metrics.validate_time.increment(start.elapsed().as_nanos() as u64);
Ok(())
}
Expand All @@ -553,7 +617,7 @@ where
for partition in partition_state {
// merge account state that is not modified
for (address, account) in partition.accounts {
if account.status.is_not_modified() && state.accounts.get(&address).is_none() {
if !state.accounts.contains_key(&address) && account.status.is_not_modified() {
state.accounts.insert(address, account);
}
}
Expand Down Expand Up @@ -581,6 +645,7 @@ where
.with_spec_id(self.spec_id)
.with_env(Box::new(self.env.clone()))
.build();
let mut results = self.results.lock().unwrap();
for txid in self.num_finality_txs..self.txs.len() {
if let Some(tx) = self.txs.get(txid) {
*evm.tx_mut() = tx.clone();
Expand All @@ -590,7 +655,7 @@ where
match evm.transact() {
Ok(result_and_state) => {
evm.db_mut().commit(result_and_state.state);
self.results.push(result_and_state.result);
results.push(result_and_state.result);
}
Err(err) => return Err(GrevmError::EvmError(err)),
}
Expand All @@ -606,9 +671,10 @@ where
self.partition_executors.clear();
let database = Arc::get_mut(&mut self.database).unwrap();
database.merge_transitions(BundleRetention::Reverts);
let mut results = self.results.lock().unwrap();
let output = ExecuteOutput {
state: std::mem::take(&mut database.bundle_state),
results: std::mem::take(&mut self.results),
results: std::mem::take(&mut results),
};
self.metrics.build_output_time.increment(start.elapsed().as_nanos() as u64);
output
Expand Down
Loading

0 comments on commit fbc54d0

Please sign in to comment.