diff --git a/Cargo.toml b/Cargo.toml index 109d42ef3..fa1480022 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -410,6 +410,7 @@ reth-trie = { path = "crates/trie/trie" } reth-trie-common = { path = "crates/trie/common" } reth-trie-db = { path = "crates/trie/db" } reth-trie-parallel = { path = "crates/trie/parallel" } +gravity-storage = { path = "crates/gravity-storage" } # revm revm = { package = "revm", git = "https://github.com/galxe/revm", rev = "a32cd92", features = [ diff --git a/crates/gravity-storage/src/block_view_storage/mod.rs b/crates/gravity-storage/src/block_view_storage/mod.rs index 40e876e1d..af36b608f 100644 --- a/crates/gravity-storage/src/block_view_storage/mod.rs +++ b/crates/gravity-storage/src/block_view_storage/mod.rs @@ -4,11 +4,11 @@ use reth_primitives::{revm_primitives::Bytecode, Address, B256, U256}; use reth_revm::database::StateProviderDatabase; use reth_storage_api::{errors::provider::ProviderError, StateProviderBox, StateProviderFactory}; use reth_trie::{updates::TrieUpdates, HashedPostState}; -use revm::{db::BundleState, primitives::AccountInfo, Database, DatabaseRef}; -use tokio::{sync::Mutex, time::{sleep, Duration}}; -use std::{ - collections::BTreeMap, - sync::Arc, +use revm::{db::BundleState, primitives::AccountInfo, DatabaseRef}; +use std::{collections::BTreeMap, sync::Arc}; +use tokio::{ + sync::Mutex, + time::{sleep, Duration}, }; use crate::GravityStorage; @@ -19,21 +19,20 @@ pub struct BlockViewStorage { inner: Mutex, } -pub struct BlockViewStorageInner { +struct BlockViewStorageInner { state_provider_info: (B256, u64), // (block_hash, block_number), block_number_to_view: BTreeMap>, block_number_to_hash: BTreeMap, block_number_to_id: BTreeMap, } - -async fn get_state_provider( +async fn get_state_provider( client: &Client, block_hash: B256, ) -> StateProviderBox { loop { let state_provider = client.state_by_block_hash(block_hash); - + match state_provider { Ok(state_provider) => break state_provider, Err(ProviderError::BlockHashNotFound(_)) => { @@ -57,12 +56,9 @@ async fn get_state_provider( } } -impl BlockViewStorage { +impl BlockViewStorage { fn new(client: Client, block_number: u64, block_hash: B256) -> Self { - Self { - client: client, - inner: Mutex::new(BlockViewStorageInner::new(block_number, block_hash)), - } + Self { client, inner: Mutex::new(BlockViewStorageInner::new(block_number, block_hash)) } } } @@ -78,8 +74,11 @@ impl BlockViewStorageInner { } #[async_trait] -impl GravityStorage for BlockViewStorage { - async fn get_state_view(&self, target_block_number: u64) -> (B256, Arc>) { +impl GravityStorage for BlockViewStorage { + async fn get_state_view( + &self, + target_block_number: u64, + ) -> (B256, Arc>) { let mut block_views = vec![]; let mut block_id = B256::ZERO; let mut block_hash; @@ -88,12 +87,16 @@ impl GravityStorage for BlockViewStorage { let storage = self.inner.lock().await; block_hash = storage.state_provider_info.0; if storage.block_number_to_view.get(&target_block_number).is_some() { - storage.block_number_to_view.iter().rev().for_each(|(block_number, block_view)| { - let block_number = *block_number; - if storage.state_provider_info.1 < block_number && block_number <= target_block_number { - block_views.push(block_view.clone()); - } - }); + storage.block_number_to_view.iter().rev().for_each( + |(block_number, block_view)| { + let block_number = *block_number; + if storage.state_provider_info.1 < block_number && + block_number <= target_block_number + { + block_views.push(block_view.clone()); + } + }, + ); block_id = *storage.block_number_to_id.get(&target_block_number).unwrap(); block_hash = storage.state_provider_info.0; break; @@ -103,10 +106,16 @@ impl GravityStorage for BlockViewStorage { } sleep(Duration::from_millis(100)).await; } - (block_id, Arc::new(BlockViewProvider::new(block_views, get_state_provider(&self.client, block_hash).await))) + ( + block_id, + Arc::new(BlockViewProvider::new( + block_views, + get_state_provider(&self.client, block_hash).await, + )), + ) } - async fn commit_state(&mut self, block_id: B256, block_number: u64, bundle_state: BundleState) { + async fn commit_state(&self, block_id: B256, block_number: u64, bundle_state: &BundleState) { let mut cached = CachedReads::default(); for (addr, acc) in bundle_state.state().iter().map(|(a, acc)| (*a, acc)) { if let Some(info) = acc.info.clone() { @@ -122,25 +131,25 @@ impl GravityStorage for BlockViewStorage { storage.block_number_to_id.insert(block_number, block_id); } - async fn update_block_hash(&mut self, block_number: u64, block_hash: B256) { + async fn insert_block_hash(&self, block_number: u64, block_hash: B256) { let mut storage = self.inner.lock().await; storage.block_number_to_hash.insert(block_number, block_hash); } - async fn get_block_hash_by_block_number(&self, block_number: u64) -> B256 { + async fn block_hash_by_number(&self, block_number: u64) -> B256 { loop { { let storage = self.inner.lock().await; match storage.block_number_to_hash.get(&block_number) { Some(block_hash) => break *block_hash, - None => {}, + None => {} } } sleep(Duration::from_millis(100)).await; } } - async fn update_canonical(&mut self, block_number: u64) { + async fn update_canonical(&self, block_number: u64) { let mut storage = self.inner.lock().await; assert!(block_number > storage.state_provider_info.1); let gc_block_number = storage.state_provider_info.1; @@ -151,8 +160,12 @@ impl GravityStorage for BlockViewStorage { storage.block_number_to_id.remove(&gc_block_number); } - async fn state_root_with_updates(&self, block_number: u64, bundle_state: BundleState) -> (B256, TrieUpdates) { - let block_hash = self.get_block_hash_by_block_number(block_number).await; + async fn state_root_with_updates( + &self, + block_number: u64, + bundle_state: &BundleState, + ) -> (B256, TrieUpdates) { + let block_hash = self.block_hash_by_number(block_number - 1).await; let state_provider = get_state_provider(&self.client, block_hash).await; let hashed_state = HashedPostState::from_bundle_state(&bundle_state.state); state_provider.state_root_with_updates(hashed_state).unwrap() @@ -166,17 +179,14 @@ pub struct BlockViewProvider { impl BlockViewProvider { fn new(block_views: Vec>, state_provider: StateProviderBox) -> Self { - Self { - block_views, - db: StateProviderDatabase::new(state_provider), - } + Self { block_views, db: StateProviderDatabase::new(state_provider) } } } -impl Database for BlockViewProvider { +impl DatabaseRef for BlockViewProvider { type Error = ProviderError; - fn basic(&mut self, address: Address) -> Result, Self::Error> { + fn basic_ref(&self, address: Address) -> Result, Self::Error> { for block_view in &self.block_views { if let Some(account) = block_view.accounts.get(&address) { return Ok(account.info.clone()); @@ -185,7 +195,7 @@ impl Database for BlockViewProvider { Ok(self.db.basic_ref(address)?) } - fn code_by_hash(&mut self, code_hash: B256) -> Result { + fn code_by_hash_ref(&self, code_hash: B256) -> Result { for block_view in &self.block_views { if let Some(bytecode) = block_view.contracts.get(&code_hash) { return Ok(bytecode.clone()); @@ -194,7 +204,7 @@ impl Database for BlockViewProvider { Ok(self.db.code_by_hash_ref(code_hash)?) } - fn storage(&mut self, address: Address, index: U256) -> Result { + fn storage_ref(&self, address: Address, index: U256) -> Result { for block_view in &self.block_views { if let Some(acc_entry) = block_view.accounts.get(&address) { if let Some(value) = acc_entry.storage.get(&index) { @@ -205,7 +215,7 @@ impl Database for BlockViewProvider { Ok(self.db.storage_ref(address, index)?) } - fn block_hash(&mut self, number: u64) -> Result { + fn block_hash_ref(&self, number: u64) -> Result { for block_view in &self.block_views { if let Some(hash) = block_view.block_hashes.get(&number) { return Ok(*hash); diff --git a/crates/gravity-storage/src/lib.rs b/crates/gravity-storage/src/lib.rs index 0cee9b0a4..b501c5a70 100644 --- a/crates/gravity-storage/src/lib.rs +++ b/crates/gravity-storage/src/lib.rs @@ -4,23 +4,29 @@ use std::sync::Arc; use async_trait::async_trait; use reth_primitives::B256; +use reth_revm::DatabaseRef; use reth_storage_api::errors::provider::ProviderError; use reth_trie::updates::TrieUpdates; use revm::db::BundleState; -use reth_revm::Database; #[async_trait] -pub trait GravityStorage : Send + Sync { - async fn get_state_view(&self, block_number: u64) -> (B256, Arc>); +pub trait GravityStorage: Send + Sync + 'static { + async fn get_state_view( + &self, + block_number: u64, + ) -> (B256, Arc>); - async fn commit_state(&mut self, block_id: B256, block_number: u64, bundle_state: BundleState); - - async fn update_block_hash(&mut self, block_number: u64, block_hash: B256); + async fn commit_state(&self, block_id: B256, block_number: u64, bundle_state: &BundleState); - async fn get_block_hash_by_block_number(&self, block_number:u64) -> B256; + async fn insert_block_hash(&self, block_number: u64, block_hash: B256); - async fn update_canonical(&mut self, block_number: u64); // gc + async fn block_hash_by_number(&self, block_number: u64) -> B256; - async fn state_root_with_updates(&self, block_number: u64, bundle_state: BundleState) -> (B256, TrieUpdates); -} + async fn update_canonical(&self, block_number: u64); // gc + async fn state_root_with_updates( + &self, + block_number: u64, + bundle_state: &BundleState, + ) -> (B256, TrieUpdates); +} diff --git a/crates/pipe-exec-layer-ext-v2/Cargo.toml b/crates/pipe-exec-layer-ext-v2/Cargo.toml index b2aa330a1..778163648 100644 --- a/crates/pipe-exec-layer-ext-v2/Cargo.toml +++ b/crates/pipe-exec-layer-ext-v2/Cargo.toml @@ -16,13 +16,13 @@ reth-primitives.workspace = true reth-rpc-types.workspace = true reth-evm-ethereum.workspace = true reth-chainspec.workspace = true -reth-storage-api.workspace = true reth-revm.workspace = true reth-evm.workspace = true reth-execution-types.workspace = true reth-trie.workspace = true reth-chain-state.workspace = true reth-rpc-types-compat.workspace = true +gravity-storage.workspace = true alloy-primitives.workspace = true tokio.workspace = true once_cell.workspace = true diff --git a/crates/pipe-exec-layer-ext-v2/src/lib.rs b/crates/pipe-exec-layer-ext-v2/src/lib.rs index 6d1d2f7e5..ae021df23 100644 --- a/crates/pipe-exec-layer-ext-v2/src/lib.rs +++ b/crates/pipe-exec-layer-ext-v2/src/lib.rs @@ -14,11 +14,9 @@ use reth_primitives::{ proofs, Address, Block, BlockWithSenders, Header, TransactionSigned, Withdrawals, B64, EMPTY_OMMER_ROOT_HASH, U256, }; -use reth_revm::database::StateProviderDatabase; use reth_rpc_types::ExecutionPayload; -use reth_storage_api::StateProvider; -use reth_trie::{updates::TrieUpdates, HashedPostState}; -use revm::{db::BundleState, State}; +use reth_trie::HashedPostState; +use revm::State; use std::sync::{ atomic::{AtomicU64, Ordering}, Arc, @@ -26,6 +24,7 @@ use std::sync::{ use once_cell::sync::OnceCell; +use gravity_storage::GravityStorage; use reth_rpc_types_compat::engine::payload::block_to_payload_v3; use tokio::sync::{ mpsc::{Receiver, Sender, UnboundedReceiver, UnboundedSender}, @@ -69,55 +68,29 @@ pub enum PipeExecLayerEvent { /// Owned by EL #[derive(Debug)] -struct PipeExecService { +struct PipeExecService { /// Immutable part of the state - core: Arc, + core: Arc>, /// Receive ordered block from Coordinator ordered_block_rx: UnboundedReceiver, latest_block_header_rx: Receiver<(B256 /* block id */, Header)>, } #[derive(Debug)] -struct Core { +struct Core { /// Send executed block hash to Coordinator executed_block_hash_tx: UnboundedSender, /// Receive verified block hash from Coordinator verified_block_hash_rx: Mutex>, latest_block_header_tx: Sender<(B256 /* block id */, Header)>, - storage: GravityStorage, + storage: Storage, evm_config: EthEvmConfig, chain_spec: Arc, latest_canonical_block_number: AtomicU64, event_tx: UnboundedSender, } -#[derive(Debug)] - -struct GravityStorage {} - -impl GravityStorage { - async fn state_by_block_number(&self, block_number: u64) -> Box { - todo!() - } - - fn commit_state(&self, state: BundleState) { - todo!() - } - - async fn merklization(&self, block_number: u64) -> (B256 /* state root */, TrieUpdates) { - todo!() - } - - async fn block_hash_by_number(&self, block_number: u64) -> B256 { - todo!() - } - - fn insert_block_hash(&self, block_number: u64, block_hash: B256) { - todo!() - } -} - -impl PipeExecService { +impl PipeExecService { async fn run(mut self) { // TODO: get latest block header from storage at startup let mut latest_block_id = B256::default(); @@ -197,7 +170,7 @@ impl PipeExecService { } } -impl Core { +impl Core { /// Push executed block hash to Coordinator and wait for verification result from Coordinator. /// Returns `None` if the channel has been closed. async fn verify_executed_block_hash(&self, block_meta: ExecutedBlockMeta) -> Option<()> { @@ -275,14 +248,14 @@ impl Core { block.header.blob_gas_used = Some(blob_gas_used); } - let state = self.storage.state_by_block_number(block.number - 1).await; - let state = StateProviderDatabase::new(state); - let mut db = State::builder().with_database_ref(state).with_bundle_update().build(); + let (block_id, state) = self.storage.get_state_view(block.number - 1).await; + assert_eq!(block_id, ordered_block.id); + let db = State::builder().with_database_ref(state).with_bundle_update().build(); let executor_provider = EthExecutorProvider::new(self.chain_spec.clone(), self.evm_config.clone()); let executor_outcome = executor_provider - .executor(&mut db) + .executor(db) .execute(BlockExecutionInput { block: &block, total_difficulty: block_env.difficulty }) .unwrap_or_else(|err| { panic!("failed to execute block {:?}: {:?}", ordered_block.id, err) @@ -301,7 +274,9 @@ impl Core { // block before execution. self.latest_block_header_tx.send((ordered_block.id, block.header.clone())).await.unwrap(); - self.storage.commit_state(executor_outcome.state.clone()); + self.storage + .commit_state(ordered_block.id, ordered_block.number, &executor_outcome.state) + .await; if self.chain_spec.is_prague_active_at_timestamp(block.timestamp) { block.requests = Some(executor_outcome.requests.clone().into()); @@ -322,7 +297,8 @@ impl Core { execution_outcome.block_logs_bloom(block.number).expect("Number is in range"); // calculate the state root - let (state_root, trie_output) = self.storage.merklization(block.number).await; + let (state_root, trie_output) = + self.storage.state_root_with_updates(block.number, &execution_outcome.state()).await; let transactions_root = proofs::calculate_transaction_root(&block.body); @@ -358,7 +334,7 @@ impl Core { "block hash verfication successful" ); - self.storage.insert_block_hash(sealed_block.number, sealed_block.hash()); + self.storage.insert_block_hash(sealed_block.number, sealed_block.hash()).await; // create the executed block data let hashed_state = HashedPostState::from_bundle_state(&execution_outcome.state().state); @@ -413,7 +389,10 @@ pub struct PipeExecLayerExt { pub static PIPE_EXEC_LAYER_EXT: OnceCell = OnceCell::new(); /// Create a new `PipeExecLayerApi` instance and launch a `PipeExecService`. -pub fn new_pipe_exec_layer_api(chain_spec: Arc) -> PipeExecLayerApi { +pub fn new_pipe_exec_layer_api( + chain_spec: Arc, + storage: Storage, +) -> PipeExecLayerApi { let (ordered_block_tx, ordered_block_rx) = tokio::sync::mpsc::unbounded_channel(); let (executed_block_hash_tx, executed_block_hash_rx) = tokio::sync::mpsc::unbounded_channel(); let (verified_block_hash_tx, verified_block_hash_rx) = tokio::sync::mpsc::unbounded_channel(); @@ -425,7 +404,7 @@ pub fn new_pipe_exec_layer_api(chain_spec: Arc) -> PipeExecLayerApi { executed_block_hash_tx, verified_block_hash_rx: Mutex::new(verified_block_hash_rx), latest_block_header_tx, - storage: GravityStorage {}, + storage, evm_config: EthEvmConfig::new(chain_spec.clone()), chain_spec, latest_canonical_block_number: AtomicU64::new(0),