Skip to content

Commit

Permalink
wait and notify
Browse files Browse the repository at this point in the history
  • Loading branch information
AshinGau committed Nov 25, 2024
1 parent 6b4cf06 commit 977f927
Show file tree
Hide file tree
Showing 3 changed files with 137 additions and 38 deletions.
35 changes: 32 additions & 3 deletions src/partition.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,8 @@ use std::{
sync::Arc,
time::{Duration, Instant},
};
use std::sync::atomic::Ordering;
use crate::scheduler::{RewardsAccumulator, RewardsAccumulators};

#[derive(Default)]
pub(crate) struct PartitionMetrics {
Expand Down Expand Up @@ -52,6 +54,8 @@ where
tx_states: SharedTxStates,
txs: Arc<Vec<TxEnv>>,

rewards_accumulators: Arc<RewardsAccumulators>,

pub partition_db: PartitionDB<DB>,
pub assigned_txs: Vec<TxId>,

Expand All @@ -70,20 +74,22 @@ where
spec_id: SpecId,
partition_id: PartitionId,
env: Env,
rewards_accumulators: Arc<RewardsAccumulators>,
scheduler_db: Arc<SchedulerDB<DB>>,
txs: Arc<Vec<TxEnv>>,
tx_states: SharedTxStates,
assigned_txs: Vec<TxId>,
) -> Self {
let coinbase = env.block.coinbase;
let partition_db = PartitionDB::new(coinbase, scheduler_db);
let partition_db = PartitionDB::new(coinbase, scheduler_db, rewards_accumulators.clone());
Self {
spec_id,
env,
coinbase,
partition_id,
tx_states,
txs,
rewards_accumulators,
partition_db,
assigned_txs,
error_txs: HashMap::new(),
Expand Down Expand Up @@ -114,6 +120,7 @@ where

if let Some(tx) = self.txs.get(txid) {
*evm.tx_mut() = tx.clone();
evm.db_mut().current_txid = txid;
let mut raw_transfer = true;
if let Ok(Some(info)) = evm.db_mut().basic(tx.caller) {
raw_transfer &= info.is_empty_code_hash();
Expand All @@ -124,11 +131,13 @@ where
}
}
evm.db_mut().raw_transfer = raw_transfer;
evm.db_mut().take_read_set(); // clean read set
} else {
panic!("Wrong transactions ID");
}
// If the transaction is unconfirmed, it may not require repeated execution
let mut should_execute = true;
let mut update_rewards = 0;
if tx_states[txid].tx_status == TransactionStatus::Unconfirmed {
if evm.db_mut().check_read_set(&tx_states[txid].read_set) {
// Unconfirmed transactions from the previous round might not need to be
Expand All @@ -137,6 +146,7 @@ where
evm.db_mut().temporary_commit_transition(transition);
should_execute = false;
self.metrics.reusable_tx_cnt += 1;
update_rewards = tx_states[txid].execute_result.rewards;
tx_states[txid].tx_status = TransactionStatus::SkipValidation;
}
}
Expand Down Expand Up @@ -178,6 +188,7 @@ where
rewards,
},
};
update_rewards = rewards;
}
Err(err) => {
// In a parallel execution environment, transactions might fail due to
Expand All @@ -192,10 +203,10 @@ where
let mut read_set = evm.db_mut().take_read_set();
// update write set with the caller and transact_to
let mut write_set = HashSet::new();
read_set.insert(LocationAndType::Basic(evm.tx().caller), None);
read_set.entry(LocationAndType::Basic(evm.tx().caller)).or_insert(None);
write_set.insert(LocationAndType::Basic(evm.tx().caller));
if let TxKind::Call(to) = evm.tx().transact_to {
read_set.insert(LocationAndType::Basic(to), None);
read_set.entry(LocationAndType::Basic(to)).or_insert(None);
write_set.insert(LocationAndType::Basic(to));
}

Expand All @@ -209,7 +220,25 @@ where
}
}
}
if !self.rewards_accumulators.is_empty() {
Self::report_rewards(self.rewards_accumulators.clone(), txid, update_rewards);
}
}
self.metrics.execute_time = start.elapsed();
}

fn report_rewards(rewards_accumulators: Arc<RewardsAccumulators>, txid: TxId, rewards: u128) {
if !rewards_accumulators.is_empty() {
for (_, accumulator) in rewards_accumulators.range((txid + 1)..).next() {
let counter = accumulator.accumulate_counter.fetch_add(1, Ordering::Release);
accumulator.accumulate_rewards.fetch_add(rewards, Ordering::Release);
if counter >= accumulator.accumulate_num {
panic!("to many reward records!");
}
if counter == accumulator.accumulate_num - 1 {
accumulator.notifier.notify_one();
}
}
}
}
}
87 changes: 62 additions & 25 deletions src/scheduler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,12 @@ use std::{
},
time::{Duration, Instant},
};
use std::collections::BTreeMap;
use std::sync::Mutex;
use atomic::Atomic;
use dashmap::DashSet;
use revm::interpreter::instructions::host::balance;
use tokio::sync::Notify;
use tracing::info;

struct ExecuteMetrics {
Expand Down Expand Up @@ -132,6 +138,26 @@ impl TxState {
}
}

pub(crate) struct RewardsAccumulator {
pub accumulate_num: usize,
pub accumulate_counter: AtomicUsize,
pub accumulate_rewards: Atomic<u128>,
pub notifier: Arc<Notify>,
}

impl RewardsAccumulator {
pub(crate) fn new(accumulate_num: usize) -> Self {
Self {
accumulate_num,
accumulate_counter: AtomicUsize::new(0),
accumulate_rewards: Atomic::<u128>::new(0),
notifier: Arc::new(Notify::new()),
}
}
}

pub(crate) type RewardsAccumulators = BTreeMap<TxId, RewardsAccumulator>;

/// A shared reference to a vector of transaction states.
/// Used to share the transaction states between the partition executors.
/// Since the state of a transaction is not modified by multiple threads simultaneously,
Expand Down Expand Up @@ -176,6 +202,8 @@ where
num_finality_txs: usize,
results: Vec<ExecutionResult>,

rewards_accumulators: Arc<RewardsAccumulators>,

metrics: ExecuteMetrics,
}

Expand Down Expand Up @@ -262,6 +290,7 @@ where
partition_executors: vec![],
num_finality_txs: 0,
results: Vec::with_capacity(num_txs),
rewards_accumulators: Arc::new(RewardsAccumulators::new()),
metrics: Default::default(),
}
}
Expand Down Expand Up @@ -298,6 +327,7 @@ where
self.spec_id,
partition_id,
self.env.clone(),
self.rewards_accumulators.clone(),
self.database.clone(),
self.txs.clone(),
self.tx_states.clone(),
Expand Down Expand Up @@ -357,7 +387,7 @@ where
/// and there is no need to record the dependency and dependent relationships of these
/// transactions. Thus achieving the purpose of pruning.
#[fastrace::trace]
fn update_and_pruning_dependency(&mut self, max_miner_involved_tx: TxId) {
fn update_and_pruning_dependency(&mut self) {
let num_finality_txs = self.num_finality_txs;
if num_finality_txs == self.txs.len() {
return;
Expand All @@ -368,19 +398,14 @@ where
let executor = executor.read().unwrap();
for (txid, dep) in executor.assigned_txs.iter().zip(executor.tx_dependency.iter()) {
let txid = *txid;
if txid >= num_finality_txs && txid >= max_miner_involved_tx {
if txid == max_miner_involved_tx {
new_dependency[txid - num_finality_txs] =
(num_finality_txs..max_miner_involved_tx).collect();
} else {
// pruning the tx that is finality state
new_dependency[txid - num_finality_txs] = dep
.clone()
.into_iter()
// pruning the dependent tx that is finality state
.filter(|dep_id| *dep_id >= num_finality_txs)
.collect();
}
if txid >= num_finality_txs{
// pruning the tx that is finality state
new_dependency[txid - num_finality_txs] = dep
.clone()
.into_iter()
// pruning the dependent tx that is finality state
.filter(|dep_id| *dep_id >= num_finality_txs)
.collect();
}
}
}
Expand All @@ -390,13 +415,12 @@ where
/// Generate unconfirmed transactions, and find the continuous minimum TxID,
/// which can be marked as finality transactions.
#[fastrace::trace]
fn generate_unconfirmed_txs(&mut self) -> TxId {
fn generate_unconfirmed_txs(&mut self) -> Vec<TxId> {
let num_partitions = self.num_partitions;
let (end_skip_id, merged_write_set) = self.merge_write_set();
self.metrics.skip_validation_cnt.increment((end_skip_id - self.num_finality_txs) as u64);
let miner_location = LocationAndType::Basic(self.coinbase);
let min_tx_id = self.num_finality_txs;
let max_miner_involved_tx = AtomicUsize::new(min_tx_id);
let miner_involved_txs = DashSet::new();
fork_join_util(num_partitions, Some(num_partitions), |_, _, part| {
// Transaction validation process:
// 1. For each transaction in each partition, traverse its read set and find the largest
Expand All @@ -412,17 +436,17 @@ where
let tx_states =
unsafe { &mut *(&(*self.tx_states) as *const Vec<TxState> as *mut Vec<TxState>) };

for (index, txid) in executor.assigned_txs.iter().enumerate() {
for txid in executor.assigned_txs.iter() {
let txid = *txid;
let mut conflict = tx_states[txid].tx_status == TransactionStatus::Conflict;
let mut updated_dependencies = BTreeSet::new();
if txid >= end_skip_id {
for (location, balance) in tx_states[txid].read_set.iter() {
if *location == miner_location {
if balance.is_none() && txid - min_tx_id != index {
if *location == miner_location && balance.is_none(){
if txid != self.num_finality_txs && !self.rewards_accumulators.contains_key(&txid) {
conflict = true;
max_miner_involved_tx.fetch_max(txid, Ordering::Relaxed);
}
miner_involved_txs.insert(txid);
}
if let Some(written_txs) = merged_write_set.get(location) {
if let Some(previous_txid) = written_txs.range(..txid).next_back() {
Expand All @@ -447,7 +471,7 @@ where
}
}
});
max_miner_involved_tx.load(Ordering::Acquire)
miner_involved_txs.into_iter().collect()
}

/// Find the continuous minimum TxID, which can be marked as finality transactions.
Expand Down Expand Up @@ -536,8 +560,14 @@ where

let span = Span::enter_with_local_parent("database commit transitions");
let mut rewards = 0;
let rewards_start_txid = match self.rewards_accumulators.range(..self.num_finality_txs).next_back() {
Some((txid, _)) => *txid,
None => start_txid,
};
for txid in start_txid..self.num_finality_txs {
rewards += tx_states[txid].execute_result.rewards;
if txid >= rewards_start_txid {
rewards += tx_states[txid].execute_result.rewards;
}
database
.commit_transition(std::mem::take(&mut tx_states[txid].execute_result.transition));
self.results.push(tx_states[txid].execute_result.result.clone().unwrap());
Expand All @@ -562,11 +592,18 @@ where
#[fastrace::trace]
fn validate_transactions(&mut self) -> Result<(), GrevmError<DB::Error>> {
let start = Instant::now();
let max_miner_involved_tx = self.generate_unconfirmed_txs();
let miner_involved_txs = self.generate_unconfirmed_txs();
let finality_tx_cnt = self.find_continuous_min_txid()?;
// update and pruning tx dependencies
self.update_and_pruning_dependency(max_miner_involved_tx);
self.update_and_pruning_dependency();
self.commit_transition(finality_tx_cnt)?;
let mut rewards_accumulators = RewardsAccumulators::new();
for txid in miner_involved_txs {
if txid > self.num_finality_txs {
rewards_accumulators.insert(txid, RewardsAccumulator::new(txid - self.num_finality_txs));
}
}
self.rewards_accumulators = Arc::new(rewards_accumulators);
self.metrics.validate_time.increment(start.elapsed().as_nanos() as u64);
Ok(())
}
Expand Down
Loading

0 comments on commit 977f927

Please sign in to comment.