Skip to content

Commit

Permalink
introduce gravity storage in pipe-exec-layer-ext-v2
Browse files Browse the repository at this point in the history
  • Loading branch information
nekomoto911 committed Dec 27, 2024
1 parent 64bff39 commit be3cd07
Show file tree
Hide file tree
Showing 5 changed files with 91 additions and 95 deletions.
1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -410,6 +410,7 @@ reth-trie = { path = "crates/trie/trie" }
reth-trie-common = { path = "crates/trie/common" }
reth-trie-db = { path = "crates/trie/db" }
reth-trie-parallel = { path = "crates/trie/parallel" }
gravity-storage = { path = "crates/gravity-storage" }

# revm
revm = { package = "revm", git = "https://github.com/galxe/revm", rev = "a32cd92", features = [
Expand Down
88 changes: 49 additions & 39 deletions crates/gravity-storage/src/block_view_storage/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,11 +4,11 @@ use reth_primitives::{revm_primitives::Bytecode, Address, B256, U256};
use reth_revm::database::StateProviderDatabase;
use reth_storage_api::{errors::provider::ProviderError, StateProviderBox, StateProviderFactory};
use reth_trie::{updates::TrieUpdates, HashedPostState};
use revm::{db::BundleState, primitives::AccountInfo, Database, DatabaseRef};
use tokio::{sync::Mutex, time::{sleep, Duration}};
use std::{
collections::BTreeMap,
sync::Arc,
use revm::{db::BundleState, primitives::AccountInfo, DatabaseRef};
use std::{collections::BTreeMap, sync::Arc};
use tokio::{
sync::Mutex,
time::{sleep, Duration},
};

use crate::GravityStorage;
Expand All @@ -19,21 +19,20 @@ pub struct BlockViewStorage<Client> {
inner: Mutex<BlockViewStorageInner>,
}

pub struct BlockViewStorageInner {
struct BlockViewStorageInner {
state_provider_info: (B256, u64), // (block_hash, block_number),
block_number_to_view: BTreeMap<u64, Arc<CachedReads>>,
block_number_to_hash: BTreeMap<u64, B256>,
block_number_to_id: BTreeMap<u64, B256>,
}


async fn get_state_provider<Client: StateProviderFactory>(
async fn get_state_provider<Client: StateProviderFactory + 'static>(
client: &Client,
block_hash: B256,
) -> StateProviderBox {
loop {
let state_provider = client.state_by_block_hash(block_hash);

match state_provider {
Ok(state_provider) => break state_provider,
Err(ProviderError::BlockHashNotFound(_)) => {
Expand All @@ -57,12 +56,9 @@ async fn get_state_provider<Client: StateProviderFactory>(
}
}

impl<Client: StateProviderFactory> BlockViewStorage<Client> {
impl<Client: StateProviderFactory + 'static> BlockViewStorage<Client> {
fn new(client: Client, block_number: u64, block_hash: B256) -> Self {
Self {
client: client,
inner: Mutex::new(BlockViewStorageInner::new(block_number, block_hash)),
}
Self { client, inner: Mutex::new(BlockViewStorageInner::new(block_number, block_hash)) }
}
}

Expand All @@ -78,8 +74,11 @@ impl BlockViewStorageInner {
}

#[async_trait]
impl<Client: StateProviderFactory> GravityStorage for BlockViewStorage<Client> {
async fn get_state_view(&self, target_block_number: u64) -> (B256, Arc<dyn Database<Error = ProviderError>>) {
impl<Client: StateProviderFactory + 'static> GravityStorage for BlockViewStorage<Client> {
async fn get_state_view(
&self,
target_block_number: u64,
) -> (B256, Arc<dyn DatabaseRef<Error = ProviderError>>) {
let mut block_views = vec![];
let mut block_id = B256::ZERO;
let mut block_hash;
Expand All @@ -88,12 +87,16 @@ impl<Client: StateProviderFactory> GravityStorage for BlockViewStorage<Client> {
let storage = self.inner.lock().await;
block_hash = storage.state_provider_info.0;
if storage.block_number_to_view.get(&target_block_number).is_some() {
storage.block_number_to_view.iter().rev().for_each(|(block_number, block_view)| {
let block_number = *block_number;
if storage.state_provider_info.1 < block_number && block_number <= target_block_number {
block_views.push(block_view.clone());
}
});
storage.block_number_to_view.iter().rev().for_each(
|(block_number, block_view)| {
let block_number = *block_number;
if storage.state_provider_info.1 < block_number &&
block_number <= target_block_number
{
block_views.push(block_view.clone());
}
},
);
block_id = *storage.block_number_to_id.get(&target_block_number).unwrap();
block_hash = storage.state_provider_info.0;
break;
Expand All @@ -103,10 +106,16 @@ impl<Client: StateProviderFactory> GravityStorage for BlockViewStorage<Client> {
}
sleep(Duration::from_millis(100)).await;
}
(block_id, Arc::new(BlockViewProvider::new(block_views, get_state_provider(&self.client, block_hash).await)))
(
block_id,
Arc::new(BlockViewProvider::new(
block_views,
get_state_provider(&self.client, block_hash).await,
)),
)
}

async fn commit_state(&mut self, block_id: B256, block_number: u64, bundle_state: BundleState) {
async fn commit_state(&self, block_id: B256, block_number: u64, bundle_state: &BundleState) {
let mut cached = CachedReads::default();
for (addr, acc) in bundle_state.state().iter().map(|(a, acc)| (*a, acc)) {
if let Some(info) = acc.info.clone() {
Expand All @@ -122,25 +131,25 @@ impl<Client: StateProviderFactory> GravityStorage for BlockViewStorage<Client> {
storage.block_number_to_id.insert(block_number, block_id);
}

async fn update_block_hash(&mut self, block_number: u64, block_hash: B256) {
async fn insert_block_hash(&self, block_number: u64, block_hash: B256) {
let mut storage = self.inner.lock().await;
storage.block_number_to_hash.insert(block_number, block_hash);
}

async fn get_block_hash_by_block_number(&self, block_number: u64) -> B256 {
async fn block_hash_by_number(&self, block_number: u64) -> B256 {
loop {
{
let storage = self.inner.lock().await;
match storage.block_number_to_hash.get(&block_number) {
Some(block_hash) => break *block_hash,
None => {},
None => {}
}
}
sleep(Duration::from_millis(100)).await;
}
}

async fn update_canonical(&mut self, block_number: u64) {
async fn update_canonical(&self, block_number: u64) {
let mut storage = self.inner.lock().await;
assert!(block_number > storage.state_provider_info.1);
let gc_block_number = storage.state_provider_info.1;
Expand All @@ -151,8 +160,12 @@ impl<Client: StateProviderFactory> GravityStorage for BlockViewStorage<Client> {
storage.block_number_to_id.remove(&gc_block_number);
}

async fn state_root_with_updates(&self, block_number: u64, bundle_state: BundleState) -> (B256, TrieUpdates) {
let block_hash = self.get_block_hash_by_block_number(block_number).await;
async fn state_root_with_updates(
&self,
block_number: u64,
bundle_state: &BundleState,
) -> (B256, TrieUpdates) {
let block_hash = self.block_hash_by_number(block_number - 1).await;
let state_provider = get_state_provider(&self.client, block_hash).await;
let hashed_state = HashedPostState::from_bundle_state(&bundle_state.state);
state_provider.state_root_with_updates(hashed_state).unwrap()
Expand All @@ -166,17 +179,14 @@ pub struct BlockViewProvider {

impl BlockViewProvider {
fn new(block_views: Vec<Arc<CachedReads>>, state_provider: StateProviderBox) -> Self {
Self {
block_views,
db: StateProviderDatabase::new(state_provider),
}
Self { block_views, db: StateProviderDatabase::new(state_provider) }
}
}

impl Database for BlockViewProvider {
impl DatabaseRef for BlockViewProvider {
type Error = ProviderError;

fn basic(&mut self, address: Address) -> Result<Option<AccountInfo>, Self::Error> {
fn basic_ref(&self, address: Address) -> Result<Option<AccountInfo>, Self::Error> {
for block_view in &self.block_views {
if let Some(account) = block_view.accounts.get(&address) {
return Ok(account.info.clone());
Expand All @@ -185,7 +195,7 @@ impl Database for BlockViewProvider {
Ok(self.db.basic_ref(address)?)
}

fn code_by_hash(&mut self, code_hash: B256) -> Result<Bytecode, Self::Error> {
fn code_by_hash_ref(&self, code_hash: B256) -> Result<Bytecode, Self::Error> {
for block_view in &self.block_views {
if let Some(bytecode) = block_view.contracts.get(&code_hash) {
return Ok(bytecode.clone());
Expand All @@ -194,7 +204,7 @@ impl Database for BlockViewProvider {
Ok(self.db.code_by_hash_ref(code_hash)?)
}

fn storage(&mut self, address: Address, index: U256) -> Result<U256, Self::Error> {
fn storage_ref(&self, address: Address, index: U256) -> Result<U256, Self::Error> {
for block_view in &self.block_views {
if let Some(acc_entry) = block_view.accounts.get(&address) {
if let Some(value) = acc_entry.storage.get(&index) {
Expand All @@ -205,7 +215,7 @@ impl Database for BlockViewProvider {
Ok(self.db.storage_ref(address, index)?)
}

fn block_hash(&mut self, number: u64) -> Result<B256, Self::Error> {
fn block_hash_ref(&self, number: u64) -> Result<B256, Self::Error> {
for block_view in &self.block_views {
if let Some(hash) = block_view.block_hashes.get(&number) {
return Ok(*hash);
Expand Down
26 changes: 16 additions & 10 deletions crates/gravity-storage/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,23 +4,29 @@ use std::sync::Arc;

use async_trait::async_trait;
use reth_primitives::B256;
use reth_revm::DatabaseRef;
use reth_storage_api::errors::provider::ProviderError;
use reth_trie::updates::TrieUpdates;
use revm::db::BundleState;
use reth_revm::Database;

#[async_trait]
pub trait GravityStorage : Send + Sync {
async fn get_state_view(&self, block_number: u64) -> (B256, Arc<dyn Database<Error = ProviderError>>);
pub trait GravityStorage: Send + Sync + 'static {
async fn get_state_view(
&self,
block_number: u64,
) -> (B256, Arc<dyn DatabaseRef<Error = ProviderError>>);

async fn commit_state(&mut self, block_id: B256, block_number: u64, bundle_state: BundleState);

async fn update_block_hash(&mut self, block_number: u64, block_hash: B256);
async fn commit_state(&self, block_id: B256, block_number: u64, bundle_state: &BundleState);

async fn get_block_hash_by_block_number(&self, block_number:u64) -> B256;
async fn insert_block_hash(&self, block_number: u64, block_hash: B256);

async fn update_canonical(&mut self, block_number: u64); // gc
async fn block_hash_by_number(&self, block_number: u64) -> B256;

async fn state_root_with_updates(&self, block_number: u64, bundle_state: BundleState) -> (B256, TrieUpdates);
}
async fn update_canonical(&self, block_number: u64); // gc

async fn state_root_with_updates(
&self,
block_number: u64,
bundle_state: &BundleState,
) -> (B256, TrieUpdates);
}
2 changes: 1 addition & 1 deletion crates/pipe-exec-layer-ext-v2/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -16,13 +16,13 @@ reth-primitives.workspace = true
reth-rpc-types.workspace = true
reth-evm-ethereum.workspace = true
reth-chainspec.workspace = true
reth-storage-api.workspace = true
reth-revm.workspace = true
reth-evm.workspace = true
reth-execution-types.workspace = true
reth-trie.workspace = true
reth-chain-state.workspace = true
reth-rpc-types-compat.workspace = true
gravity-storage.workspace = true
alloy-primitives.workspace = true
tokio.workspace = true
once_cell.workspace = true
Expand Down
Loading

0 comments on commit be3cd07

Please sign in to comment.