Skip to content

Commit

Permalink
refactor: adjust the interface to adapt to reth (#27)
Browse files Browse the repository at this point in the history
- adjust the interface to adapt to reth
- call block_on in the block thread pool if already in the tokio async
context
  • Loading branch information
nekomoto911 authored Nov 22, 2024
1 parent f990412 commit c126ad1
Show file tree
Hide file tree
Showing 8 changed files with 167 additions and 132 deletions.
6 changes: 4 additions & 2 deletions benches/gigagas.rs
Original file line number Diff line number Diff line change
Expand Up @@ -62,11 +62,12 @@ fn bench(c: &mut Criterion, name: &str, db: InMemoryDB, txs: Vec<TxEnv>) {
let root = Span::root(format!("{name} Grevm Parallel"), SpanContext::random());
let _guard = root.set_local_parent();
metrics::with_local_recorder(&recorder, || {
let executor = GrevmScheduler::new(
let mut executor = GrevmScheduler::new(
black_box(SpecId::LATEST),
black_box(env.clone()),
black_box(db.clone()),
black_box(txs.clone()),
None,
);
let _ = executor.parallel_execute();

Expand All @@ -92,11 +93,12 @@ fn bench(c: &mut Criterion, name: &str, db: InMemoryDB, txs: Vec<TxEnv>) {
let root = Span::root(format!("{name} Grevm Sequential"), SpanContext::random());
let _guard = root.set_local_parent();
metrics::with_local_recorder(&recorder, || {
let executor = GrevmScheduler::new(
let mut executor = GrevmScheduler::new(
black_box(SpecId::LATEST),
black_box(env.clone()),
black_box(db.clone()),
black_box(txs.clone()),
None,
);
let _ = executor.force_sequential_execute();

Expand Down
6 changes: 4 additions & 2 deletions benches/mainnet.rs
Original file line number Diff line number Diff line change
Expand Up @@ -53,23 +53,25 @@ fn benchmark_mainnet(c: &mut Criterion) {

group.bench_function("Grevm Parallel", |b| {
b.iter(|| {
let executor = GrevmScheduler::new(
let mut executor = GrevmScheduler::new(
black_box(spec_id),
black_box(env.clone()),
black_box(db.clone()),
black_box(txs.clone()),
None,
);
executor.parallel_execute()
})
});

group.bench_function("Grevm Sequential", |b| {
b.iter(|| {
let executor = GrevmScheduler::new(
let mut executor = GrevmScheduler::new(
black_box(spec_id),
black_box(env.clone()),
black_box(db.clone()),
black_box(txs.clone()),
None,
);
executor.force_sequential_execute()
})
Expand Down
3 changes: 2 additions & 1 deletion src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,8 @@ use tokio::runtime::{Builder, Runtime};
mod hint;
mod partition;
mod scheduler;
mod storage;
/// Manages storage-related operations.
pub mod storage;
mod tx_dependency;
use rayon::prelude::{IntoParallelIterator, ParallelIterator};

Expand Down
82 changes: 40 additions & 42 deletions src/scheduler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ use crate::{
fork_join_util,
hint::ParallelExecutionHints,
partition::PartitionExecutor,
storage::{LazyUpdateValue, SchedulerDB},
storage::{LazyUpdateValue, SchedulerDB, State},
tx_dependency::{DependentTxsVec, TxDependency},
GrevmError, LocationAndType, ResultAndTransition, TransactionStatus, TxId, CPU_CORES,
GREVM_RUNTIME, MAX_NUM_ROUND,
Expand All @@ -18,11 +18,10 @@ use std::{
use ahash::{AHashMap as HashMap, AHashSet as HashSet};
use metrics::{counter, gauge};
use revm::{
db::{states::bundle_state::BundleRetention, BundleState},
primitives::{
AccountInfo, Address, Bytecode, EVMError, Env, ExecutionResult, SpecId, TxEnv, B256, U256,
},
CacheState, DatabaseRef, EvmBuilder,
CacheState, DatabaseCommit, DatabaseRef, EvmBuilder,
};
use tracing::info;

Expand Down Expand Up @@ -70,8 +69,6 @@ struct ExecuteMetrics {
commit_transition_time: metrics::Counter,
/// Time taken to execute transactions in sequential(in nanoseconds).
sequential_execute_time: metrics::Counter,
/// Time taken to build output(in nanoseconds).
build_output_time: metrics::Counter,
}

impl Default for ExecuteMetrics {
Expand All @@ -97,16 +94,13 @@ impl Default for ExecuteMetrics {
merge_write_set_time: counter!("grevm.merge_write_set_time"),
commit_transition_time: counter!("grevm.commit_transition_time"),
sequential_execute_time: counter!("grevm.sequential_execute_time"),
build_output_time: counter!("grevm.build_output_time"),
}
}
}

/// The output of the execution of a block.
#[derive(Debug)]
pub struct ExecuteOutput {
/// The changed state of the block after execution.
pub state: BundleState,
/// All the results of the transactions in the block.
pub results: Vec<ExecutionResult>,
}
Expand Down Expand Up @@ -161,7 +155,7 @@ where
/// The database utilized by the scheduler.
/// It is shared among the partition executors,
/// allowing them to read the final state from previous rounds.
database: Arc<SchedulerDB<DB>>,
pub database: Arc<SchedulerDB<DB>>,

/// The dependency relationship between transactions.
/// Used to construct the next round of transaction partitions.
Expand Down Expand Up @@ -208,12 +202,14 @@ impl<Error> DatabaseRef for DatabaseWrapper<Error> {
}
}

/// Creates a new GrevmScheduler instance.
/// Creates a new GrevmScheduler instance using DB type without 'static constraint.
/// If `state` is not None, it will be used as the initial state before the block execution.
pub fn new_grevm_scheduler<DB>(
spec_id: SpecId,
env: Env,
db: DB,
txs: Vec<TxEnv>,
state: Option<Box<State>>,
) -> GrevmScheduler<DatabaseWrapper<DB::Error>>
where
DB: DatabaseRef + Send + Sync,
Expand All @@ -230,7 +226,7 @@ where
>(boxed)
};
let db: DatabaseWrapper<DB::Error> = DatabaseWrapper(db);
GrevmScheduler::new(spec_id, env, db, Arc::new(txs))
GrevmScheduler::new(spec_id, env, db, Arc::new(txs), state)
}

impl<DB> GrevmScheduler<DB>
Expand All @@ -239,8 +235,14 @@ where
DB::Error: Send + Sync,
{
/// Creates a new GrevmScheduler instance.
#[fastrace::trace]
pub fn new(spec_id: SpecId, env: Env, db: DB, txs: Arc<Vec<TxEnv>>) -> Self {
/// If `state` is not None, it will be used as the initial state before the block execution.
pub fn new(
spec_id: SpecId,
env: Env,
db: DB,
txs: Arc<Vec<TxEnv>>,
state: Option<Box<State>>,
) -> Self {
let coinbase = env.block.coinbase;
let num_partitions = *CPU_CORES * 2 + 1; // 2 * cpu + 1 for initial partition number
let num_txs = txs.len();
Expand All @@ -250,7 +252,7 @@ where
env,
coinbase,
txs,
database: Arc::new(SchedulerDB::new(db)),
database: Arc::new(SchedulerDB::new(state.unwrap_or_default(), db)),
tx_dependencies: TxDependency::new(num_txs),
tx_states: Arc::new(vec![TxState::new(); num_txs]),
num_partitions,
Expand Down Expand Up @@ -303,13 +305,18 @@ where
}

let start = Instant::now();
GREVM_RUNTIME.block_on(async {
let mut tasks = vec![];
for executor in &self.partition_executors {
let executor = executor.clone();
tasks.push(GREVM_RUNTIME.spawn(async move { executor.write().unwrap().execute() }));
}
futures::future::join_all(tasks).await;
// Do not block tokio runtime if we are in async context
tokio::task::block_in_place(|| {
GREVM_RUNTIME.block_on(async {
let mut tasks = vec![];
for executor in &self.partition_executors {
let executor = executor.clone();
tasks.push(
GREVM_RUNTIME.spawn(async move { executor.write().unwrap().execute() }),
);
}
futures::future::join_all(tasks).await;
})
});
self.metrics.parallel_execute_time.increment(start.elapsed().as_nanos() as u64);

Expand Down Expand Up @@ -501,7 +508,7 @@ where
let database = Arc::get_mut(&mut self.database).unwrap();
if self.num_finality_txs < self.txs.len() {
// Merging these states is only useful when there is a next round of execution.
Self::merge_not_modified_state(&mut database.cache, partition_state);
Self::merge_not_modified_state(&mut database.state.cache, partition_state);
}

#[allow(invalid_reference_casting)]
Expand Down Expand Up @@ -599,21 +606,6 @@ where
Ok(())
}

#[fastrace::trace]
fn build_output(&mut self) -> ExecuteOutput {
let start = Instant::now();
// MUST drop the `PartitionExecutor::scheduler_db` before get mut
self.partition_executors.clear();
let database = Arc::get_mut(&mut self.database).unwrap();
database.merge_transitions(BundleRetention::Reverts);
let output = ExecuteOutput {
state: std::mem::take(&mut database.bundle_state),
results: std::mem::take(&mut self.results),
};
self.metrics.build_output_time.increment(start.elapsed().as_nanos() as u64);
output
}

#[fastrace::trace]
fn parse_hints(&mut self) {
let start = Instant::now();
Expand Down Expand Up @@ -643,7 +635,7 @@ where

if self.txs.len() < self.num_partitions && !force_parallel {
self.execute_remaining_sequential()?;
return Ok(self.build_output());
return Ok(ExecuteOutput { results: std::mem::take(&mut self.results) });
}

if !force_sequential {
Expand All @@ -668,25 +660,31 @@ where
self.execute_remaining_sequential()?;
}

Ok(self.build_output())
Ok(ExecuteOutput { results: std::mem::take(&mut self.results) })
}

/// Execute transactions in parallel.
pub fn parallel_execute(mut self) -> Result<ExecuteOutput, GrevmError<DB::Error>> {
pub fn parallel_execute(&mut self) -> Result<ExecuteOutput, GrevmError<DB::Error>> {
self.evm_execute(None, true, None)
}

/// Execute transactions parallelly with or without hints.
pub fn force_parallel_execute(
mut self,
&mut self,
with_hints: bool,
num_partitions: Option<usize>,
) -> Result<ExecuteOutput, GrevmError<DB::Error>> {
self.evm_execute(Some(false), with_hints, num_partitions)
}

/// Execute transactions sequentially.
pub fn force_sequential_execute(mut self) -> Result<ExecuteOutput, GrevmError<DB::Error>> {
pub fn force_sequential_execute(&mut self) -> Result<ExecuteOutput, GrevmError<DB::Error>> {
self.evm_execute(Some(true), false, None)
}

/// Take the state of the scheduler.
/// It is typically called after the execution.
pub fn take_state(self) -> Box<State> {
Arc::try_unwrap(self.database).ok().unwrap().state
}
}
Loading

0 comments on commit c126ad1

Please sign in to comment.