Skip to content

Commit

Permalink
fix: make the tx that access miner balance in contract redo (#29)
Browse files Browse the repository at this point in the history
  • Loading branch information
AshinGau authored Nov 26, 2024
1 parent c7bec85 commit f40762c
Show file tree
Hide file tree
Showing 7 changed files with 286 additions and 161 deletions.
7 changes: 4 additions & 3 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -5,12 +5,13 @@ edition = "2021"
description = "Create Parallel EVM"

[dependencies]
revm = { package = "revm", git = "https://github.com/galxe/revm", rev = "875f5ad" }
revm-primitives = { package = "revm-primitives", git = "https://github.com/galxe/revm", rev = "875f5ad" }
revm = { package = "revm", git = "https://github.com/galxe/revm", rev = "20b5883" }
revm-primitives = { package = "revm-primitives", git = "https://github.com/galxe/revm", rev = "20b5883" }
fastrace = "0.7"
tracing = "0.1.40"
ahash = { version = "0.8.11", features = ["serde"] }
rayon = "1.10.0"
atomic = "0.6.0"

# Alloy
alloy-chains = "0.1.18"
Expand All @@ -34,7 +35,7 @@ criterion = "0.5.1"
metrics-util = "0.17.0"
walkdir = "2.5.0"
rayon = "1.10.0"
revme = { package = "revme", git = "https://github.com/galxe/revm", rev = "875f5ad" }
revme = { package = "revme", git = "https://github.com/galxe/revm", rev = "20b5883" }
fastrace = { version = "0.7", features = ["enable"] }
fastrace-jaeger = "0.7"
tikv-jemallocator = "0.5.0"
Expand Down
3 changes: 1 addition & 2 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,6 @@ lazy_static! {
.unwrap();
}

use crate::storage::LazyUpdateValue;
pub use scheduler::*;

/// The maximum number of rounds for transaction execution.
Expand Down Expand Up @@ -133,7 +132,7 @@ pub(crate) struct ResultAndTransition {
pub transition: Vec<(Address, TransitionAccount)>,

/// Rewards to miner.
pub miner_update: LazyUpdateValue,
pub rewards: u128,
}

/// Utility function for parallel execution using fork-join pattern.
Expand Down
78 changes: 57 additions & 21 deletions src/partition.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
use crate::{
scheduler::RewardsAccumulators,
storage::{PartitionDB, SchedulerDB},
LocationAndType, PartitionId, ResultAndTransition, SharedTxStates, TransactionStatus, TxId,
TxState,
Expand All @@ -8,9 +9,10 @@ use revm::{
primitives::{Address, EVMError, Env, ResultAndState, SpecId, TxEnv, TxKind},
DatabaseRef, EvmBuilder,
};
use revm_primitives::db::Database;
use std::{
collections::BTreeSet,
sync::Arc,
sync::{atomic::Ordering, Arc},
time::{Duration, Instant},
};

Expand Down Expand Up @@ -51,6 +53,8 @@ where
tx_states: SharedTxStates,
txs: Arc<Vec<TxEnv>>,

rewards_accumulators: Arc<RewardsAccumulators>,

pub partition_db: PartitionDB<DB>,
pub assigned_txs: Vec<TxId>,

Expand All @@ -69,20 +73,22 @@ where
spec_id: SpecId,
partition_id: PartitionId,
env: Env,
rewards_accumulators: Arc<RewardsAccumulators>,
scheduler_db: Arc<SchedulerDB<DB>>,
txs: Arc<Vec<TxEnv>>,
tx_states: SharedTxStates,
assigned_txs: Vec<TxId>,
) -> Self {
let coinbase = env.block.coinbase;
let partition_db = PartitionDB::new(coinbase, scheduler_db);
let partition_db = PartitionDB::new(coinbase, scheduler_db, rewards_accumulators.clone());
Self {
spec_id,
env,
coinbase,
partition_id,
tx_states,
txs,
rewards_accumulators,
partition_db,
assigned_txs,
error_txs: HashMap::new(),
Expand All @@ -97,10 +103,11 @@ where
/// validation process.
pub(crate) fn execute(&mut self) {
let start = Instant::now();
let coinbase = self.env.block.coinbase;
let mut evm = EvmBuilder::default()
.with_db(&mut self.partition_db)
.with_spec_id(self.spec_id)
.with_env(Box::new(self.env.clone()))
.with_env(Box::new(std::mem::take(&mut self.env)))
.build();

#[allow(invalid_reference_casting)]
Expand All @@ -112,30 +119,46 @@ where

if let Some(tx) = self.txs.get(txid) {
*evm.tx_mut() = tx.clone();
evm.db_mut().current_txid = txid;
evm.db_mut().raw_transfer = true; // no need to wait miner rewards
let mut raw_transfer = true;
if let Ok(Some(info)) = evm.db_mut().basic(tx.caller) {
raw_transfer = info.is_empty_code_hash();
}
if let TxKind::Call(to) = tx.transact_to {
if let Ok(Some(info)) = evm.db_mut().basic(to) {
raw_transfer &= info.is_empty_code_hash();
}
}
evm.db_mut().raw_transfer = raw_transfer;
evm.db_mut().take_read_set(); // clean read set
} else {
panic!("Wrong transactions ID");
}
// If the transaction is unconfirmed, it may not require repeated execution
let mut should_execute = true;
if tx_states[txid].tx_status == TransactionStatus::Unconfirmed {
let mut update_rewards = 0;
if tx_states[txid].tx_status == TransactionStatus::Unconfirmed &&
!self.rewards_accumulators.contains_key(&txid)
{
if evm.db_mut().check_read_set(&tx_states[txid].read_set) {
// Unconfirmed transactions from the previous round might not need to be
// re-executed.
let transition = &tx_states[txid].execute_result.transition;
evm.db_mut().temporary_commit_transition(transition);
should_execute = false;
self.metrics.reusable_tx_cnt += 1;
update_rewards = tx_states[txid].execute_result.rewards;
tx_states[txid].tx_status = TransactionStatus::SkipValidation;
}
}
if should_execute {
let result = evm.transact();
let result = evm.transact_lazy_reward();
match result {
Ok(result_and_state) => {
let ResultAndState { result, mut state } = result_and_state;
let mut read_set = evm.db_mut().take_read_set();
let (write_set, miner_update, remove_miner) =
evm.db().generate_write_set(&mut state);
let ResultAndState { result, mut state, rewards } = result_and_state;
let read_set = evm.db_mut().take_read_set();
let write_set = evm.db().generate_write_set(&mut state);

// Check if the transaction can be skipped
// skip_validation=true does not necessarily mean the transaction can skip
Expand All @@ -144,19 +167,13 @@ where
// if a transaction with a smaller TxID conflicts,
// the states of subsequent transactions are invalid.
let mut skip_validation =
!matches!(read_set.get(&LocationAndType::Basic(coinbase)), Some(None));
skip_validation &= !self.rewards_accumulators.contains_key(&txid);
skip_validation &=
read_set.iter().all(|l| tx_states[txid].read_set.contains_key(l.0));
skip_validation &=
write_set.iter().all(|l| tx_states[txid].write_set.contains(l));

if remove_miner {
// remove miner's state if we handle rewards separately
state.remove(&self.coinbase);
} else {
// add miner to read set, because it's in write set.
// set miner's value to None to make this tx redo in next round if
// unconfirmed.
read_set.insert(LocationAndType::Basic(self.coinbase), None);
}
// temporary commit to cache_db, to make use the remaining txs can read the
// updated data
let transition = evm.db_mut().temporary_commit(state);
Expand All @@ -171,9 +188,10 @@ where
execute_result: ResultAndTransition {
result: Some(result),
transition,
miner_update,
rewards,
},
};
update_rewards = rewards;
}
Err(err) => {
// In a parallel execution environment, transactions might fail due to
Expand All @@ -188,10 +206,10 @@ where
let mut read_set = evm.db_mut().take_read_set();
// update write set with the caller and transact_to
let mut write_set = HashSet::new();
read_set.insert(LocationAndType::Basic(evm.tx().caller), None);
read_set.entry(LocationAndType::Basic(evm.tx().caller)).or_insert(None);
write_set.insert(LocationAndType::Basic(evm.tx().caller));
if let TxKind::Call(to) = evm.tx().transact_to {
read_set.insert(LocationAndType::Basic(to), None);
read_set.entry(LocationAndType::Basic(to)).or_insert(None);
write_set.insert(LocationAndType::Basic(to));
}

Expand All @@ -205,7 +223,25 @@ where
}
}
}
if !self.rewards_accumulators.is_empty() {
Self::report_rewards(self.rewards_accumulators.clone(), txid, update_rewards);
}
}
self.metrics.execute_time = start.elapsed();
}

fn report_rewards(rewards_accumulators: Arc<RewardsAccumulators>, txid: TxId, rewards: u128) {
if !rewards_accumulators.is_empty() {
for (_, accumulator) in rewards_accumulators.range((txid + 1)..) {
let counter = accumulator.accumulate_counter.fetch_add(1, Ordering::Release);
accumulator.accumulate_rewards.fetch_add(rewards, Ordering::Release);
if counter >= accumulator.accumulate_num {
panic!("to many reward records!");
}
if counter == accumulator.accumulate_num - 1 {
accumulator.notifier.notify_one();
}
}
}
}
}
Loading

0 comments on commit f40762c

Please sign in to comment.