Skip to content

Commit

Permalink
fix: make the tx that access miner balance in contract redo
Browse files Browse the repository at this point in the history
  • Loading branch information
AshinGau committed Nov 23, 2024
1 parent c126ad1 commit 1e382ff
Show file tree
Hide file tree
Showing 6 changed files with 170 additions and 157 deletions.
5 changes: 3 additions & 2 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,8 @@ edition = "2021"
description = "Create Parallel EVM"

[dependencies]
revm = "14.0.0"
revm = { package = "revm", git = "https://github.com/nekomoto911/revm", branch = "v14.0.0" }
revm-primitives = { package = "revm-primitives", git = "https://github.com/nekomoto911/revm", branch = "v14.0.0" }
fastrace = "0.7"
tracing = "0.1.40"
ahash = { version = "0.8.11", features = ["serde"] }
Expand Down Expand Up @@ -33,7 +34,7 @@ criterion = "0.5.1"
metrics-util = "0.17.0"
walkdir = "2.5.0"
rayon = "1.10.0"
revme = "0.10.3"
revme = { package = "revme", git = "https://github.com/nekomoto911/revm", branch = "v14.0.0" }
fastrace = { version = "0.7", features = ["enable"] }
fastrace-jaeger = "0.7"
tikv-jemallocator = "0.5.0"
Expand Down
3 changes: 1 addition & 2 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,6 @@ lazy_static! {
.unwrap();
}

use crate::storage::LazyUpdateValue;
pub use scheduler::*;

/// The maximum number of rounds for transaction execution.
Expand Down Expand Up @@ -133,7 +132,7 @@ pub(crate) struct ResultAndTransition {
pub transition: Vec<(Address, TransitionAccount)>,

/// Rewards to miner.
pub miner_update: LazyUpdateValue,
pub rewards: u128,
}

/// Utility function for parallel execution using fork-join pattern.
Expand Down
36 changes: 20 additions & 16 deletions src/partition.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ use revm::{
primitives::{Address, EVMError, Env, ResultAndState, SpecId, TxEnv, TxKind},
DatabaseRef, EvmBuilder,
};
use revm_primitives::db::Database;
use std::{
collections::BTreeSet,
sync::Arc,
Expand Down Expand Up @@ -97,10 +98,11 @@ where
/// validation process.
pub(crate) fn execute(&mut self) {
let start = Instant::now();
let coinbase = self.env.block.coinbase;
let mut evm = EvmBuilder::default()
.with_db(&mut self.partition_db)
.with_spec_id(self.spec_id)
.with_env(Box::new(self.env.clone()))
.with_env(Box::new(std::mem::take(&mut self.env)))
.build();

#[allow(invalid_reference_casting)]
Expand All @@ -112,6 +114,16 @@ where

if let Some(tx) = self.txs.get(txid) {
*evm.tx_mut() = tx.clone();
let mut raw_transfer = true;
if let Ok(Some(info)) = evm.db_mut().basic(tx.caller) {
raw_transfer &= info.is_empty_code_hash();
}
if let TxKind::Call(to) = tx.transact_to {
if let Ok(Some(info)) = evm.db_mut().basic(to) {
raw_transfer &= info.is_empty_code_hash();
}
}
evm.db_mut().raw_transfer = raw_transfer;
} else {
panic!("Wrong transactions ID");
}
Expand All @@ -129,13 +141,12 @@ where
}
}
if should_execute {
let result = evm.transact();
let result = evm.transact_lazy_reward();
match result {
Ok(result_and_state) => {
let ResultAndState { result, mut state } = result_and_state;
let mut read_set = evm.db_mut().take_read_set();
let (write_set, miner_update, remove_miner) =
evm.db().generate_write_set(&mut state);
let ResultAndState { result, mut state, rewards } = result_and_state;
let read_set = evm.db_mut().take_read_set();
let write_set = evm.db().generate_write_set(&mut state);

// Check if the transaction can be skipped
// skip_validation=true does not necessarily mean the transaction can skip
Expand All @@ -144,19 +155,12 @@ where
// if a transaction with a smaller TxID conflicts,
// the states of subsequent transactions are invalid.
let mut skip_validation =
!matches!(read_set.get(&LocationAndType::Basic(coinbase)), Some(None));
skip_validation &=
read_set.iter().all(|l| tx_states[txid].read_set.contains_key(l.0));
skip_validation &=
write_set.iter().all(|l| tx_states[txid].write_set.contains(l));

if remove_miner {
// remove miner's state if we handle rewards separately
state.remove(&self.coinbase);
} else {
// add miner to read set, because it's in write set.
// set miner's value to None to make this tx redo in next round if
// unconfirmed.
read_set.insert(LocationAndType::Basic(self.coinbase), None);
}
// temporary commit to cache_db, to make use the remaining txs can read the
// updated data
let transition = evm.db_mut().temporary_commit(state);
Expand All @@ -171,7 +175,7 @@ where
execute_result: ResultAndTransition {
result: Some(result),
transition,
miner_update,
rewards,
},
};
}
Expand Down
70 changes: 44 additions & 26 deletions src/scheduler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,27 +2,29 @@ use crate::{
fork_join_util,
hint::ParallelExecutionHints,
partition::PartitionExecutor,
storage::{LazyUpdateValue, SchedulerDB, State},
storage::{SchedulerDB, State},
tx_dependency::{DependentTxsVec, TxDependency},
GrevmError, LocationAndType, ResultAndTransition, TransactionStatus, TxId, CPU_CORES,
GREVM_RUNTIME, MAX_NUM_ROUND,
};
use fastrace::Span;
use std::{
collections::BTreeSet,
ops::DerefMut,
sync::{Arc, RwLock},
time::{Duration, Instant},
};

use ahash::{AHashMap as HashMap, AHashSet as HashSet};
use fastrace::Span;
use metrics::{counter, gauge};
use revm::{
primitives::{
AccountInfo, Address, Bytecode, EVMError, Env, ExecutionResult, SpecId, TxEnv, B256, U256,
},
CacheState, DatabaseCommit, DatabaseRef, EvmBuilder,
};
use std::{
collections::BTreeSet,
ops::DerefMut,
sync::{
atomic::{AtomicUsize, Ordering},
Arc, RwLock,
},
time::{Duration, Instant},
};
use tracing::info;

struct ExecuteMetrics {
Expand Down Expand Up @@ -355,7 +357,7 @@ where
/// and there is no need to record the dependency and dependent relationships of these
/// transactions. Thus achieving the purpose of pruning.
#[fastrace::trace]
fn update_and_pruning_dependency(&mut self) {
fn update_and_pruning_dependency(&mut self, max_miner_involved_tx: TxId) {
let num_finality_txs = self.num_finality_txs;
if num_finality_txs == self.txs.len() {
return;
Expand All @@ -365,14 +367,20 @@ where
for executor in &self.partition_executors {
let executor = executor.read().unwrap();
for (txid, dep) in executor.assigned_txs.iter().zip(executor.tx_dependency.iter()) {
if *txid >= num_finality_txs {
// pruning the tx that is finality state
new_dependency[*txid - num_finality_txs] = dep
.clone()
.into_iter()
// pruning the dependent tx that is finality state
.filter(|dep_id| *dep_id >= num_finality_txs)
.collect();
let txid = *txid;
if txid >= num_finality_txs && txid >= max_miner_involved_tx {
if txid == max_miner_involved_tx {
new_dependency[txid - num_finality_txs] =
(num_finality_txs..max_miner_involved_tx).collect();
} else {
// pruning the tx that is finality state
new_dependency[txid - num_finality_txs] = dep
.clone()
.into_iter()
// pruning the dependent tx that is finality state
.filter(|dep_id| *dep_id >= num_finality_txs)
.collect();
}
}
}
}
Expand All @@ -382,10 +390,13 @@ where
/// Generate unconfirmed transactions, and find the continuous minimum TxID,
/// which can be marked as finality transactions.
#[fastrace::trace]
fn generate_unconfirmed_txs(&mut self) {
fn generate_unconfirmed_txs(&mut self) -> TxId {
let num_partitions = self.num_partitions;
let (end_skip_id, merged_write_set) = self.merge_write_set();
self.metrics.skip_validation_cnt.increment((end_skip_id - self.num_finality_txs) as u64);
let miner_location = LocationAndType::Basic(self.coinbase);
let min_tx_id = self.num_finality_txs;
let max_miner_involved_tx = AtomicUsize::new(min_tx_id);
fork_join_util(num_partitions, Some(num_partitions), |_, _, part| {
// Transaction validation process:
// 1. For each transaction in each partition, traverse its read set and find the largest
Expand All @@ -401,12 +412,18 @@ where
let tx_states =
unsafe { &mut *(&(*self.tx_states) as *const Vec<TxState> as *mut Vec<TxState>) };

for txid in executor.assigned_txs.iter() {
for (index, txid) in executor.assigned_txs.iter().enumerate() {
let txid = *txid;
let mut conflict = tx_states[txid].tx_status == TransactionStatus::Conflict;
let mut updated_dependencies = BTreeSet::new();
if txid >= end_skip_id {
for (location, _) in tx_states[txid].read_set.iter() {
for (location, balance) in tx_states[txid].read_set.iter() {
if *location == miner_location {
if balance.is_none() && txid - min_tx_id != index {
conflict = true;
max_miner_involved_tx.fetch_max(txid, Ordering::Relaxed);
}
}
if let Some(written_txs) = merged_write_set.get(location) {
if let Some(previous_txid) = written_txs.range(..txid).next_back() {
// update dependencies: previous_txid <- txid
Expand All @@ -430,6 +447,7 @@ where
}
}
});
max_miner_involved_tx.load(Ordering::Acquire)
}

/// Find the continuous minimum TxID, which can be marked as finality transactions.
Expand Down Expand Up @@ -514,12 +532,12 @@ where
#[allow(invalid_reference_casting)]
let tx_states =
unsafe { &mut *(&(*self.tx_states) as *const Vec<TxState> as *mut Vec<TxState>) };
let mut miner_updates = Vec::with_capacity(finality_tx_cnt);
let start_txid = self.num_finality_txs - finality_tx_cnt;

let span = Span::enter_with_local_parent("database commit transitions");
let mut rewards = 0;
for txid in start_txid..self.num_finality_txs {
miner_updates.push(tx_states[txid].execute_result.miner_update.clone());
rewards += tx_states[txid].execute_result.rewards;
database
.commit_transition(std::mem::take(&mut tx_states[txid].execute_result.transition));
self.results.push(tx_states[txid].execute_result.result.clone().unwrap());
Expand All @@ -532,7 +550,7 @@ where
// set, and track the rewards for the miner for each transaction separately.
// The miner’s account is only updated after validation by SchedulerDB.increment_balances
database
.update_balances(vec![(self.coinbase, LazyUpdateValue::merge(miner_updates))])
.increment_balances(vec![(self.coinbase, rewards)])
.map_err(|err| GrevmError::EvmError(EVMError::Database(err)))?;
self.metrics.commit_transition_time.increment(start.elapsed().as_nanos() as u64);
Ok(())
Expand All @@ -544,10 +562,10 @@ where
#[fastrace::trace]
fn validate_transactions(&mut self) -> Result<(), GrevmError<DB::Error>> {
let start = Instant::now();
self.generate_unconfirmed_txs();
let max_miner_involved_tx = self.generate_unconfirmed_txs();
let finality_tx_cnt = self.find_continuous_min_txid()?;
// update and pruning tx dependencies
self.update_and_pruning_dependency();
self.update_and_pruning_dependency(max_miner_involved_tx);
self.commit_transition(finality_tx_cnt)?;
self.metrics.validate_time.increment(start.elapsed().as_nanos() as u64);
Ok(())
Expand Down
Loading

0 comments on commit 1e382ff

Please sign in to comment.