Skip to content

Commit

Permalink
fix1
Browse files Browse the repository at this point in the history
  • Loading branch information
AshinGau committed Dec 3, 2024
1 parent 3e8cc66 commit 79347f5
Show file tree
Hide file tree
Showing 2 changed files with 30 additions and 14 deletions.
24 changes: 18 additions & 6 deletions src/scheduler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,8 +11,7 @@ use ahash::{AHashMap as HashMap, AHashSet as HashSet};
use atomic::Atomic;
use dashmap::DashSet;
use fastrace::Span;
use lazy_static::lazy_static;
use metrics::{gauge, histogram};
use metrics::{counter, gauge, histogram};
use revm::{
primitives::{
AccountInfo, Address, Bytecode, EVMError, Env, ExecutionResult, SpecId, TxEnv, B256, U256,
Expand All @@ -32,6 +31,7 @@ use tokio::sync::Notify;
use tracing::info;

struct ExecuteMetrics {
block_height: metrics::Counter,
/// Number of times parallel execution is called.
parallel_execute_calls: metrics::Gauge,
/// Number of times sequential execution is called.
Expand Down Expand Up @@ -84,6 +84,7 @@ struct ExecuteMetrics {
impl Default for ExecuteMetrics {
fn default() -> Self {
Self {
block_height: counter!("grevm.block_height"),
parallel_execute_calls: gauge!("grevm.parallel_round_calls"),
sequential_execute_calls: gauge!("grevm.sequential_execute_calls"),
total_tx_cnt: histogram!("grevm.total_tx_cnt"),
Expand Down Expand Up @@ -113,6 +114,7 @@ impl Default for ExecuteMetrics {
/// Collect metrics and report
#[derive(Default)]
struct ExecuteMetricsCollector {
block_height: u64,
parallel_execute_calls: u64,
sequential_execute_calls: u64,
total_tx_cnt: u64,
Expand Down Expand Up @@ -140,6 +142,7 @@ struct ExecuteMetricsCollector {
impl ExecuteMetricsCollector {
fn report(&self) {
let execute_metrics = ExecuteMetrics::default();
execute_metrics.block_height.absolute(self.block_height);
execute_metrics.parallel_execute_calls.set(self.parallel_execute_calls as f64);
execute_metrics.sequential_execute_calls.set(self.sequential_execute_calls as f64);
execute_metrics.total_tx_cnt.record(self.total_tx_cnt as f64);
Expand Down Expand Up @@ -337,7 +340,6 @@ where
let coinbase = env.block.coinbase;
let num_partitions = *CPU_CORES * 2 + 1; // 2 * cpu + 1 for initial partition number
let num_txs = txs.len();
info!("Parallel execute {} txs of SpecId {:?}", num_txs, spec_id);
Self {
spec_id,
env,
Expand Down Expand Up @@ -550,12 +552,12 @@ where
/// If the smallest TxID is a conflict transaction, return an error.
#[fastrace::trace]
fn find_continuous_min_txid(&mut self) -> Result<usize, GrevmError<DB::Error>> {
let mut min_execute_time = Duration::from_secs(u64::MAX);
let mut sum_execute_time = Duration::from_secs(0);
let mut max_execute_time = Duration::from_secs(0);
for executor in &self.partition_executors {
let mut executor = executor.write().unwrap();
self.metrics.reusable_tx_cnt += executor.metrics.reusable_tx_cnt;
min_execute_time = min_execute_time.min(executor.metrics.execute_time);
sum_execute_time += executor.metrics.execute_time;
max_execute_time = max_execute_time.max(executor.metrics.execute_time);
if executor.assigned_txs[0] == self.num_finality_txs &&
self.tx_states[self.num_finality_txs].tx_status == TransactionStatus::Conflict
Expand All @@ -568,7 +570,9 @@ where
let mut conflict_tx_cnt = 0;
let mut unconfirmed_tx_cnt = 0;
let mut finality_tx_cnt = 0;
self.metrics.partition_et_diff += (max_execute_time - min_execute_time).as_nanos() as u64;
let avg_execution_time =
sum_execute_time.as_nanos() / self.partition_executors.len() as u128;
self.metrics.partition_et_diff += (max_execute_time.as_nanos() - avg_execution_time) as u64;
#[allow(invalid_reference_casting)]
let tx_states =
unsafe { &mut *(&(*self.tx_states) as *const Vec<TxState> as *mut Vec<TxState>) };
Expand Down Expand Up @@ -755,6 +759,14 @@ where
with_hints: bool,
num_partitions: Option<usize>,
) -> Result<ExecuteOutput, GrevmError<DB::Error>> {
let block_height: u64 = self.env.block.number.try_into().unwrap_or(0);
info!(
"Parallel execute {} txs: block={}, SpecId={:?}",
self.txs.len(),
block_height,
self.spec_id
);
self.metrics.block_height = block_height;
if with_hints {
self.parse_hints();
}
Expand Down
20 changes: 12 additions & 8 deletions src/tx_dependency.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ use std::{
pub(crate) type DependentTxsVec = SmallVec<[TxId; 1]>;

use ahash::{AHashMap as HashMap, AHashSet as HashSet};
use metrics::counter;
use metrics::{counter, histogram};

const RAW_TRANSFER_WEIGHT: usize = 1;

Expand Down Expand Up @@ -214,22 +214,26 @@ impl TxDependency {
if !(*DEBUG_BOTTLENECK) {
return;
}
if let Some(0) = self.round {
counter!("grevm.total_block_cnt").increment(1);
}
let num_finality_txs = self.num_finality_txs;
let num_txs = num_finality_txs + self.tx_dependency.len();
let num_remaining = self.tx_dependency.len();
if num_txs < 64 || num_remaining < num_txs / 3 {
return;
if let Some(0) = self.round {
counter!("grevm.total_block_cnt").increment(1);
}
let mut subgraph = BTreeSet::new();
if let Some((_, groups)) = weighted_group.last_key_value() {
if self.round.is_none() {
let largest_ratio = groups[0].len() as f64 / num_remaining as f64;
histogram!("grevm.large_graph_ratio").record(largest_ratio);
if groups[0].len() >= num_remaining / 2 {
counter!("grevm.low_parallelism_cnt").increment(1);
}
}
if groups[0].len() >= num_remaining / 3 {
subgraph.extend(groups[0].clone());
}
}
if subgraph.is_empty() {
if num_txs < 64 || num_remaining < num_txs / 3 || subgraph.is_empty() {
return;
}

Expand Down Expand Up @@ -265,7 +269,7 @@ impl TxDependency {
if chain_len > graph_len * 2 / 3 {
// Long chain
counter!("grevm.large_graph", "type" => "chain", "tip" => tip.clone()).increment(1);
} else if chain_len < max(3, graph_len / 8) {
} else if chain_len < max(3, graph_len / 6) {
// Star Graph
counter!("grevm.large_graph", "type" => "star", "tip" => tip.clone()).increment(1);
} else {
Expand Down

0 comments on commit 79347f5

Please sign in to comment.