Skip to content

Commit

Permalink
fix the test bug
Browse files Browse the repository at this point in the history
  • Loading branch information
Richard1048576 committed Oct 25, 2024
1 parent 9904292 commit bad3034
Show file tree
Hide file tree
Showing 3 changed files with 37 additions and 10 deletions.
8 changes: 4 additions & 4 deletions src/hint.rs
Original file line number Diff line number Diff line change
Expand Up @@ -94,15 +94,15 @@ impl ParallelExecutionHints {
/// However, this would introduce locking overhead and impact performance.
/// The primary consideration is that developers are aware there are no conflicts between transactions, making the `Mutex` approach unnecessarily verbose and cumbersome.
#[fastrace::trace]
pub(crate) fn parse_hints(&self, txs: Arc<Vec<TxEnv>>) {
pub(crate) fn parse_hints(&self, txs: &Arc<Vec<TxEnv>>) {
// Utilize fork-join utility to process transactions in parallel
fork_join_util(txs.len(), None, |start_tx, end_tx, _| {
#[allow(invalid_reference_casting)]
let hints =
unsafe { &mut *(&(*self.tx_states) as *const Vec<TxState> as *mut Vec<TxState>) };
for index in start_tx..end_tx {
let tx_env = &txs[index];
let rw_set = &mut hints[index];
for tx_index in start_tx..end_tx {
let tx_env = &txs[tx_index];
let rw_set = &mut hints[tx_index];
// Insert caller's basic location into read-write set
rw_set.insert_location(LocationAndType::Basic(tx_env.caller), RWType::ReadWrite);
if let TxKind::Call(to_address) = tx_env.transact_to {
Expand Down
37 changes: 32 additions & 5 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,11 @@ use revm::primitives::{Address, EVMError, ExecutionResult, U256};
use revm::TransitionAccount;
use std::cmp::min;
use std::fmt::{Display, Formatter};
use std::hash::Hash;
use std::sync::atomic::{AtomicUsize, Ordering};
use std::sync::{Arc, Mutex};
use std::thread;
use revm::precompile::HashMap;
use tokio::runtime::{Builder, Runtime};
mod hint;
mod partition;
Expand Down Expand Up @@ -156,20 +159,44 @@ where
F: Fn(usize, usize, usize) + Send + Sync + 'scope,
{
let parallel_cnt = num_partitions.unwrap_or(*CPU_CORES * 2 + 1);
let index = AtomicUsize::new(0);
let partition_id = AtomicUsize::new(0);
let remaining = num_elements % parallel_cnt;
let chunk_size = num_elements / parallel_cnt;
thread::scope(|scope| {
for _ in 0..parallel_cnt {
scope.spawn(|| {
let index = index.fetch_add(1, Ordering::SeqCst);
let start_pos = chunk_size * index + min(index, remaining);
let partition_id = partition_id.fetch_add(1, Ordering::SeqCst);
let start_pos = chunk_size * partition_id + min(partition_id, remaining);
let mut end_pos = start_pos + chunk_size;
if index < remaining {
if partition_id < remaining {
end_pos += 1;
}
f(start_pos, end_pos, index);
f(start_pos, end_pos, partition_id);
});
}
});
}

#[test]
fn test_fork_join_util() {
let num_elements: usize = 9008;
let num_partitions: usize = 10;
let groups: Arc<Mutex<HashMap<usize, (usize, usize)>>> = Arc::new(Mutex::new(HashMap::new()));
fork_join_util(num_elements, Some(num_partitions), |start_pos, end_pos, index| {
groups.lock().unwrap().insert(index, (start_pos, end_pos));
});

let target_group = HashMap::from([
(0, (0, 901)),
(1, (901, 1802)),
(2, (1802, 2703)),
(3, (2703, 3604)),
(4, (3604, 4505)),
(5, (4505, 5406)),
(6, (5406, 6307)),
(7, (6307, 7208)),
(8, (7208, 8108)),
(9, (8108, 9008)),
]);
assert_eq!(*groups.lock().unwrap(), target_group);
}
2 changes: 1 addition & 1 deletion src/scheduler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -604,7 +604,7 @@ where
fn parse_hints(&mut self) {
let start = Instant::now();
let hints = ParallelExecutionHints::new(self.tx_states.clone());
hints.parse_hints(self.txs.clone());
hints.parse_hints(&self.txs);
self.tx_dependencies.init_tx_dependency(self.tx_states.clone());
self.metrics.parse_hints_time.increment(start.elapsed().as_nanos() as u64);
}
Expand Down

0 comments on commit bad3034

Please sign in to comment.