Skip to content

Commit

Permalink
refactor the evm_execute code of scheduler
Browse files Browse the repository at this point in the history
  • Loading branch information
Richard1048576 committed Oct 24, 2024
1 parent 46ff319 commit 14c19c8
Show file tree
Hide file tree
Showing 3 changed files with 46 additions and 32 deletions.
74 changes: 44 additions & 30 deletions src/scheduler.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
use std::collections::{BTreeSet, HashMap, HashSet};
use std::fmt::Debug;
use std::ops::DerefMut;
use std::sync::{Arc, RwLock};
use std::time::{Duration, Instant};
Expand Down Expand Up @@ -231,6 +232,8 @@ where
{
/// Creates a new GrevmScheduler instance.
pub fn new(spec_id: SpecId, env: Env, db: DB, txs: Vec<TxEnv>) -> Self {
println!("Create new GrevmScheduler SpecId={:?}, txs.len()={:?}", spec_id, txs.len());

let coinbase = env.block.coinbase;
let num_partitions = *CPU_CORES * 2 + 1; // 2 * cpu + 1 for initial partition number
let num_txs = txs.len();
Expand Down Expand Up @@ -609,39 +612,42 @@ where
#[fastrace::trace]
fn evm_execute(
&mut self,
force_sequential: Option<bool>,
with_hints: bool,
num_partitions: Option<usize>,
) -> Result<ExecuteOutput, GrevmError<DB::Error>> {
if with_hints {
force_sequential: bool,
parse_hints_with_txs: bool,
) -> Result<ExecuteOutput, GrevmError<DB::Error>>
where
<DB as DatabaseRef>::Error: Debug,
{
if parse_hints_with_txs {
self.parse_hints();
}
if let Some(num_partitions) = num_partitions {
self.num_partitions = num_partitions;
}

self.metrics.total_tx_cnt.increment(self.txs.len() as u64);
let force_parallel = !force_sequential.unwrap_or(true); // adaptive false
let force_sequential = force_sequential.unwrap_or(false); // adaptive false

if self.txs.len() < self.num_partitions && !force_parallel {
self.execute_remaining_sequential()?;
return Ok(self.build_output());
}

if !force_sequential {
let mut round = 0;
while round < MAX_NUM_ROUND {
if self.num_finality_txs < self.txs.len() {
self.partition_transactions();
if self.num_partitions == 1 && !force_parallel {
break;
}
round += 1;
self.round_execute()?;
} else {
if self.num_finality_txs >= self.txs.len() {
assert_eq!(self.num_finality_txs, self.txs.len());
info!("Round {:?} touch the finality!", round);
break;
}
// partition or repartition the txs
self.partition_transactions();
if self.num_partitions == 1 && force_sequential {
break;
}

round += 1;
match self.round_execute() {
Ok(_) => println!("Round {:?} execute success, self.num_finality_txs={:?}",
round, self.num_partitions),
Err(err) => {
info!("Round {:?} execute failed which error={:?}", round, err);
// FIXME(gravity_richard): add more error code later
panic!()
},
}
}
self.metrics.parallel_tx_cnt.increment(self.num_finality_txs as u64);
}
Expand All @@ -655,21 +661,29 @@ where
}

/// Execute transactions in parallel.
pub fn parallel_execute(mut self) -> Result<ExecuteOutput, GrevmError<DB::Error>> {
self.evm_execute(None, true, None)
pub fn parallel_execute(mut self) -> Result<ExecuteOutput, GrevmError<DB::Error>>
where
<DB as DatabaseRef>::Error: Debug,
{
self.evm_execute(false, true)
}

/// Execute transactions parallelly with or without hints.
pub fn force_parallel_execute(
mut self,
with_hints: bool,
num_partitions: Option<usize>,
) -> Result<ExecuteOutput, GrevmError<DB::Error>> {
self.evm_execute(Some(false), with_hints, num_partitions)
) -> Result<ExecuteOutput, GrevmError<DB::Error>>
where
<DB as DatabaseRef>::Error: Debug,
{
self.evm_execute(false, with_hints)
}

/// Execute transactions sequentially.
pub fn force_sequential_execute(mut self) -> Result<ExecuteOutput, GrevmError<DB::Error>> {
self.evm_execute(Some(true), false, None)
pub fn force_sequential_execute(mut self) -> Result<ExecuteOutput, GrevmError<DB::Error>>
where
<DB as DatabaseRef>::Error: Debug,
{
self.evm_execute(true, false)
}
}
2 changes: 1 addition & 1 deletion tests/common/execute.rs
Original file line number Diff line number Diff line change
Expand Up @@ -119,7 +119,7 @@ pub(crate) fn compare_evm_execute<DB>(
let start = Instant::now();
let parallel = GrevmScheduler::new(SpecId::LATEST, env.clone(), db.clone(), txs.clone());
// set determined partitions
parallel_result = parallel.force_parallel_execute(with_hints, Some(23));
parallel_result = parallel.force_parallel_execute(with_hints);
println!("Grevm parallel execute time: {}ms", start.elapsed().as_millis());

let snapshot = recorder.snapshotter().snapshot();
Expand Down
2 changes: 1 addition & 1 deletion tests/mainnet.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ fn test_execute_alloy(block: Block, db: InMemoryDB) {
let mut parallel_result = Err(GrevmError::UnreachableError(String::from("Init")));
metrics::with_local_recorder(&recorder, || {
let executor = GrevmScheduler::new(spec_id, env, db, txs);
parallel_result = executor.force_parallel_execute(true, Some(23));
parallel_result = executor.force_parallel_execute(true);

let snapshot = recorder.snapshotter().snapshot();
for (key, unit, desc, value) in snapshot.into_vec() {
Expand Down

0 comments on commit 14c19c8

Please sign in to comment.