From 64bff3980edd0c7d6d6df45f73c6b2a00da2caf5 Mon Sep 17 00:00:00 2001 From: nekomoto911 Date: Thu, 26 Dec 2024 02:54:09 +0800 Subject: [PATCH] implement pipe-exec-layer-ext-v2 --- Cargo.toml | 2 + crates/engine/tree/Cargo.toml | 1 + crates/engine/tree/src/tree/mod.rs | 69 +++- crates/pipe-exec-layer-ext-v2/Cargo.toml | 34 ++ crates/pipe-exec-layer-ext-v2/src/lib.rs | 454 +++++++++++++++++++++++ crates/pipe-exec-layer-ext/src/lib.rs | 4 +- 6 files changed, 560 insertions(+), 4 deletions(-) create mode 100644 crates/pipe-exec-layer-ext-v2/Cargo.toml create mode 100644 crates/pipe-exec-layer-ext-v2/src/lib.rs diff --git a/Cargo.toml b/Cargo.toml index cdbb0eeaf..109d42ef3 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -84,6 +84,7 @@ members = [ "crates/payload/primitives/", "crates/payload/validator/", "crates/pipe-exec-layer-ext/", + "crates/pipe-exec-layer-ext-v2/", "crates/primitives-traits/", "crates/primitives/", "crates/prune/prune", @@ -372,6 +373,7 @@ reth-payload-builder = { path = "crates/payload/builder" } reth-payload-primitives = { path = "crates/payload/primitives" } reth-payload-validator = { path = "crates/payload/validator" } reth-pipe-exec-layer-ext = { path = "crates/pipe-exec-layer-ext" } +reth-pipe-exec-layer-ext-v2 = { path = "crates/pipe-exec-layer-ext-v2" } reth-primitives = { path = "crates/primitives", default-features = false, features = [ "std", ] } diff --git a/crates/engine/tree/Cargo.toml b/crates/engine/tree/Cargo.toml index 8683265e1..b08248932 100644 --- a/crates/engine/tree/Cargo.toml +++ b/crates/engine/tree/Cargo.toml @@ -35,6 +35,7 @@ reth-tasks.workspace = true reth-trie.workspace = true reth-trie-parallel.workspace = true reth-pipe-exec-layer-ext.workspace = true +reth-pipe-exec-layer-ext-v2.workspace = true # common futures.workspace = true diff --git a/crates/engine/tree/src/tree/mod.rs b/crates/engine/tree/src/tree/mod.rs index dbeb879e5..091256fc2 100644 --- a/crates/engine/tree/src/tree/mod.rs +++ b/crates/engine/tree/src/tree/mod.rs @@ -55,9 +55,8 @@ use std::{ time::Instant, }; use tokio::sync::{ - mpsc::{UnboundedReceiver, UnboundedSender}, - oneshot, - oneshot::error::TryRecvError, + mpsc::{self, UnboundedReceiver, UnboundedSender}, + oneshot::{self, error::TryRecvError}, }; use tracing::*; @@ -69,6 +68,9 @@ pub use config::TreeConfig; pub use invalid_block_hook::{InvalidBlockHooks, NoopInvalidBlockHook}; pub use reth_engine_primitives::InvalidBlockHook; use reth_pipe_exec_layer_ext::PIPE_EXEC_LAYER_EXT; +use reth_pipe_exec_layer_ext_v2::{ + PipeExecLayerEvent, PIPE_EXEC_LAYER_EXT as PIPE_EXEC_LAYER_EXT_V2, +}; /// Keeps track of the state of the tree. /// @@ -624,6 +626,58 @@ where (incoming, outgoing) } + fn try_recv_pipe_exec_event(&self) -> Result, RecvError> { + if let Some(ext) = PIPE_EXEC_LAYER_EXT_V2.get() { + if self.persistence_state.in_progress() { + let mut waited_time_ms = 0; + loop { + match ext.event_rx.blocking_lock().try_recv() { + Ok(event) => return Ok(Some(event)), + Err(mpsc::error::TryRecvError::Empty) => { + if waited_time_ms > 500 { + // timeout + return Ok(None); + } + std::thread::sleep(std::time::Duration::from_millis(10)); + waited_time_ms += 10; + } + Err(mpsc::error::TryRecvError::Disconnected) => return Err(RecvError), + } + } + } else { + let event = ext.event_rx.blocking_lock().blocking_recv(); + if event.is_some() { + Ok(event) + } else { + Err(RecvError) + } + } + } else { + Ok(None) + } + } + + fn on_pipe_exec_event(&mut self, event: PipeExecLayerEvent) { + match event { + PipeExecLayerEvent::InsertExecutedBlock(block, tx) => { + debug!(target: "on_pipe_exec_event", block_number = %block.block().number, block_hash = %block.block().hash(), "Received insert executed block event"); + self.state.tree_state.insert_executed(block); + tx.send(()).unwrap(); + } + PipeExecLayerEvent::MakeCanonical(payload, tx) => { + let block_number = payload.block_number(); + let block_hash = payload.block_hash(); + debug!(target: "on_pipe_exec_event", block_number = %block_number, block_hash = %block_hash, "Received make canonical event"); + self.on_new_payload(payload, None).unwrap_or_else(|err| { + panic!( + "Failed to make canonical, block_number={block_number} block_hash={block_hash}: {err}", + ) + }); + tx.send(()).unwrap(); + } + } + } + /// Returns a new [`Sender`] to send messages to this type. pub fn sender(&self) -> Sender>> { self.incoming_tx.clone() @@ -634,6 +688,15 @@ where /// This will block the current thread and process incoming messages. pub fn run(mut self) { loop { + match self.try_recv_pipe_exec_event() { + Ok(Some(event)) => self.on_pipe_exec_event(event), + Ok(None) => {} + Err(RecvError) => { + error!(target: "engine::tree", "Pipe exec layer channel disconnected"); + return + } + } + match self.try_recv_engine_message() { Ok(Some(msg)) => { debug!(target: "engine::tree", %msg, "received new engine message"); diff --git a/crates/pipe-exec-layer-ext-v2/Cargo.toml b/crates/pipe-exec-layer-ext-v2/Cargo.toml new file mode 100644 index 000000000..b2aa330a1 --- /dev/null +++ b/crates/pipe-exec-layer-ext-v2/Cargo.toml @@ -0,0 +1,34 @@ +[package] +name = "reth-pipe-exec-layer-ext-v2" +version.workspace = true +edition.workspace = true +homepage.workspace = true +license.workspace = true +repository.workspace = true +rust-version.workspace = true +description = "pipeline execution layer extension" + +[lints] +workspace = true + +[dependencies] +reth-primitives.workspace = true +reth-rpc-types.workspace = true +reth-evm-ethereum.workspace = true +reth-chainspec.workspace = true +reth-storage-api.workspace = true +reth-revm.workspace = true +reth-evm.workspace = true +reth-execution-types.workspace = true +reth-trie.workspace = true +reth-chain-state.workspace = true +reth-rpc-types-compat.workspace = true +alloy-primitives.workspace = true +tokio.workspace = true +once_cell.workspace = true + +# ethereum +revm.workspace = true + +# misc +tracing.workspace = true diff --git a/crates/pipe-exec-layer-ext-v2/src/lib.rs b/crates/pipe-exec-layer-ext-v2/src/lib.rs new file mode 100644 index 000000000..6d1d2f7e5 --- /dev/null +++ b/crates/pipe-exec-layer-ext-v2/src/lib.rs @@ -0,0 +1,454 @@ +//! Pipeline execution layer extension + +use alloy_primitives::B256; +use reth_chain_state::ExecutedBlock; +use reth_chainspec::{ChainSpec, EthereumHardforks}; +use reth_evm::{ + execute::{BlockExecutionInput, BlockExecutorProvider, Executor}, + ConfigureEvmEnv, NextBlockEnvAttributes, +}; +use reth_evm_ethereum::{execute::EthExecutorProvider, EthEvmConfig}; +use reth_execution_types::ExecutionOutcome; +use reth_primitives::{ + constants::{BEACON_NONCE, EMPTY_WITHDRAWALS}, + proofs, Address, Block, BlockWithSenders, Header, TransactionSigned, Withdrawals, B64, + EMPTY_OMMER_ROOT_HASH, U256, +}; +use reth_revm::database::StateProviderDatabase; +use reth_rpc_types::ExecutionPayload; +use reth_storage_api::StateProvider; +use reth_trie::{updates::TrieUpdates, HashedPostState}; +use revm::{db::BundleState, State}; +use std::sync::{ + atomic::{AtomicU64, Ordering}, + Arc, +}; + +use once_cell::sync::OnceCell; + +use reth_rpc_types_compat::engine::payload::block_to_payload_v3; +use tokio::sync::{ + mpsc::{Receiver, Sender, UnboundedReceiver, UnboundedSender}, + oneshot, Mutex, +}; + +use tracing::*; + +#[derive(Debug, Clone, Copy)] +pub struct ExecutedBlockMeta { + /// Which ordered block is used to execute the block + pub block_id: B256, + /// Block hash of the executed block + pub block_hash: B256, +} + +#[derive(Debug)] +pub struct OrderedBlock { + /// BlockId of the parent block generated by Gravity SDK + pub parent_id: B256, + /// BlockId of the block generated by Gravity SDK + pub id: B256, + pub number: u64, + pub timestamp: u64, + pub coinbase: Address, + pub prev_randao: B256, + pub withdrawals: Withdrawals, + /// Ordered transactions in the block + pub transactions: Vec, + /// Senders of the transactions in the block + pub senders: Vec
, +} + +#[derive(Debug)] +pub enum PipeExecLayerEvent { + /// Insert executed block to state tree + InsertExecutedBlock(ExecutedBlock, oneshot::Sender<()>), + /// Make executed block canonical + MakeCanonical(ExecutionPayload, oneshot::Sender<()>), +} + +/// Owned by EL +#[derive(Debug)] +struct PipeExecService { + /// Immutable part of the state + core: Arc, + /// Receive ordered block from Coordinator + ordered_block_rx: UnboundedReceiver, + latest_block_header_rx: Receiver<(B256 /* block id */, Header)>, +} + +#[derive(Debug)] +struct Core { + /// Send executed block hash to Coordinator + executed_block_hash_tx: UnboundedSender, + /// Receive verified block hash from Coordinator + verified_block_hash_rx: Mutex>, + latest_block_header_tx: Sender<(B256 /* block id */, Header)>, + storage: GravityStorage, + evm_config: EthEvmConfig, + chain_spec: Arc, + latest_canonical_block_number: AtomicU64, + event_tx: UnboundedSender, +} + +#[derive(Debug)] + +struct GravityStorage {} + +impl GravityStorage { + async fn state_by_block_number(&self, block_number: u64) -> Box { + todo!() + } + + fn commit_state(&self, state: BundleState) { + todo!() + } + + async fn merklization(&self, block_number: u64) -> (B256 /* state root */, TrieUpdates) { + todo!() + } + + async fn block_hash_by_number(&self, block_number: u64) -> B256 { + todo!() + } + + fn insert_block_hash(&self, block_number: u64, block_hash: B256) { + todo!() + } +} + +impl PipeExecService { + async fn run(mut self) { + // TODO: get latest block header from storage at startup + let mut latest_block_id = B256::default(); + let latest_block_header = Header::default(); + let mut latest_block_number = latest_block_header.number; + self.core + .latest_block_header_tx + .send((latest_block_id, latest_block_header)) + .await + .unwrap(); + self.core.latest_canonical_block_number.store(latest_block_number, Ordering::Release); + + loop { + // get ordered block from queue + let ordered_block = match self.ordered_block_rx.recv().await { + Some(ordered_block) => ordered_block, + None => break, + }; + assert_eq!(ordered_block.parent_id, latest_block_id); + assert_eq!(ordered_block.number, latest_block_number + 1); + latest_block_id = ordered_block.id; + latest_block_number = ordered_block.number; + + // Retrieve the parent block header to generate the necessary configs for executing the + // current block + let (parent_id, parent_block_header) = + self.latest_block_header_rx.recv().await.unwrap(); + assert_eq!(ordered_block.parent_id, parent_id); + assert_eq!(ordered_block.number, parent_block_header.number + 1); + + tokio::spawn({ + let core = self.core.clone(); + async move { + let block_number = ordered_block.number; + let executed_block = core + .execute_and_verify_ordered_block(ordered_block, parent_block_header) + .await; + + let payload: reth_rpc_types::ExecutionPayloadV3 = + block_to_payload_v3(executed_block.block.as_ref().clone()); + + // Ensure that blocks are inserted and made canonical in block number order + while core.latest_canonical_block_number.load(Ordering::Acquire) + 1 < + block_number + { + tokio::task::yield_now().await; + } + assert_eq!( + core.latest_canonical_block_number.load(Ordering::Relaxed) + 1, + block_number + ); + + // Insert executed block to state tree + let (tx, rx) = oneshot::channel(); + core.event_tx + .send(PipeExecLayerEvent::InsertExecutedBlock(executed_block, tx)) + .unwrap(); + rx.await.unwrap(); + + debug!(target: "PipeExecService", block_number=?block_number, "block inserted"); + + // Make executed block canonical + let (tx, rx) = oneshot::channel(); + core.event_tx + .send(PipeExecLayerEvent::MakeCanonical( + ExecutionPayload::from(payload), + tx, + )) + .unwrap(); + rx.await.unwrap(); + core.latest_canonical_block_number.store(block_number, Ordering::Release); + + debug!(target: "PipeExecService", block_number=?block_number, "block made canonical"); + } + }); + } + } +} + +impl Core { + /// Push executed block hash to Coordinator and wait for verification result from Coordinator. + /// Returns `None` if the channel has been closed. + async fn verify_executed_block_hash(&self, block_meta: ExecutedBlockMeta) -> Option<()> { + self.executed_block_hash_tx.send(block_meta).ok()?; + let meta = self.verified_block_hash_rx.lock().await.recv().await?; + assert_eq!(block_meta.block_id, meta.block_id); + assert_eq!(block_meta.block_hash, meta.block_hash); + Some(()) + } + + async fn execute_and_verify_ordered_block( + &self, + ordered_block: OrderedBlock, + parent_header: Header, + ) -> ExecutedBlock { + debug!(target: "execute_and_verify_ordered_block", + id=?ordered_block.id, + parent_id=?ordered_block.parent_id, + number=?ordered_block.number, + "new ordered block" + ); + + let (_, block_env) = self.evm_config.next_cfg_and_block_env( + &parent_header, + NextBlockEnvAttributes { + timestamp: ordered_block.timestamp, + suggested_fee_recipient: ordered_block.coinbase, + prev_randao: ordered_block.prev_randao, + }, + ); + + let mut block = BlockWithSenders { + block: Block { + header: Header { + ommers_hash: EMPTY_OMMER_ROOT_HASH, + beneficiary: ordered_block.coinbase, + timestamp: ordered_block.timestamp, + mix_hash: ordered_block.prev_randao, + nonce: BEACON_NONCE, + base_fee_per_gas: Some(block_env.basefee.to::()), + number: ordered_block.number, + gas_limit: block_env + .gas_limit + .try_into() + .unwrap_or(self.chain_spec.max_gas_limit), + difficulty: U256::ZERO, + excess_blob_gas: block_env.blob_excess_gas_and_price.map(|v| v.excess_blob_gas), + ..Default::default() + }, + body: ordered_block.transactions, + ..Default::default() + }, + senders: ordered_block.senders, + }; + + if self.chain_spec.is_shanghai_active_at_timestamp(block.timestamp) { + if ordered_block.withdrawals.is_empty() { + block.header.withdrawals_root = Some(EMPTY_WITHDRAWALS); + block.withdrawals = Some(Withdrawals::default()); + } else { + block.header.withdrawals_root = + Some(proofs::calculate_withdrawals_root(&ordered_block.withdrawals)); + block.withdrawals = Some(ordered_block.withdrawals); + } + } + + // only determine cancun fields when active + if self.chain_spec.is_cancun_active_at_timestamp(block.timestamp) { + let mut blob_gas_used: u64 = 0; + for tx in &block.body { + if let Some(blob_tx) = tx.transaction.as_eip4844() { + blob_gas_used += blob_tx.blob_gas(); + } + } + block.header.blob_gas_used = Some(blob_gas_used); + } + + let state = self.storage.state_by_block_number(block.number - 1).await; + let state = StateProviderDatabase::new(state); + let mut db = State::builder().with_database_ref(state).with_bundle_update().build(); + + let executor_provider = + EthExecutorProvider::new(self.chain_spec.clone(), self.evm_config.clone()); + let executor_outcome = executor_provider + .executor(&mut db) + .execute(BlockExecutionInput { block: &block, total_difficulty: block_env.difficulty }) + .unwrap_or_else(|err| { + panic!("failed to execute block {:?}: {:?}", ordered_block.id, err) + }); + + debug!(target: "execute_and_verify_ordered_block", + id=?ordered_block.id, + parent_id=?ordered_block.parent_id, + number=?ordered_block.number, + "block executed" + ); + + block.header.gas_used = executor_outcome.gas_used; + + // All fields needed for next block execution are filled, ready to be consumed by the next + // block before execution. + self.latest_block_header_tx.send((ordered_block.id, block.header.clone())).await.unwrap(); + + self.storage.commit_state(executor_outcome.state.clone()); + + if self.chain_spec.is_prague_active_at_timestamp(block.timestamp) { + block.requests = Some(executor_outcome.requests.clone().into()); + block.header.requests_root = + Some(proofs::calculate_requests_root(&executor_outcome.requests)); + } + + let execution_outcome = ExecutionOutcome::new( + executor_outcome.state, + vec![executor_outcome.receipts.into_iter().map(|r| Some(r)).collect::>()].into(), + block.number, + vec![executor_outcome.requests.into()], + ); + + let receipts_root = + execution_outcome.receipts_root_slow(block.number).expect("Number is in range"); + let logs_bloom = + execution_outcome.block_logs_bloom(block.number).expect("Number is in range"); + + // calculate the state root + let (state_root, trie_output) = self.storage.merklization(block.number).await; + + let transactions_root = proofs::calculate_transaction_root(&block.body); + + // Fill the block header with the calculated values + block.header.state_root = state_root; + block.header.transactions_root = transactions_root; + block.header.receipts_root = receipts_root; + block.header.logs_bloom = logs_bloom; + + block.header.parent_hash = self.storage.block_hash_by_number(block.number - 1).await; + + let sealed_block = block.seal_slow(); + + debug!(target: "execute_and_verify_ordered_block", id=?ordered_block.id, + parent_id=?ordered_block.parent_id, + number=?ordered_block.number, + hash=?sealed_block.hash(), + "block sealed" + ); + + self.verify_executed_block_hash(ExecutedBlockMeta { + block_id: ordered_block.id, + block_hash: sealed_block.hash(), + }) + .await + .unwrap(); + + debug!(target: "execute_and_verify_ordered_block", + id=?ordered_block.id, + parent_id=?ordered_block.parent_id, + number=?ordered_block.number, + hash=?sealed_block.hash(), + "block hash verfication successful" + ); + + self.storage.insert_block_hash(sealed_block.number, sealed_block.hash()); + + // create the executed block data + let hashed_state = HashedPostState::from_bundle_state(&execution_outcome.state().state); + ExecutedBlock { + block: Arc::new(sealed_block.block), + senders: Arc::new(sealed_block.senders), + execution_output: Arc::new(execution_outcome), + hashed_state: Arc::new(hashed_state), + trie: Arc::new(trie_output), + } + } +} + +/// Called by Coordinator +#[derive(Debug)] +pub struct PipeExecLayerApi { + ordered_block_tx: UnboundedSender, + executed_block_hash_rx: Mutex>, + verified_block_hash_tx: UnboundedSender, +} + +impl PipeExecLayerApi { + /// Push ordered block to EL for execution. + /// Returns `None` if the channel has been closed. + pub fn push_ordered_block(&self, block: OrderedBlock) -> Option<()> { + self.ordered_block_tx.send(block).ok() + } + + /// Pull executed block hash from EL for verification. + /// Returns `None` if the channel has been closed. + pub async fn pull_executed_block_hash(&self, block_id: B256) -> Option { + let block_meta = self.executed_block_hash_rx.lock().await.recv().await?; + assert_eq!(block_id, block_meta.block_id); + Some(block_meta.block_hash) + } + + /// Push verified block hash to EL for commit. + /// Returns `None` if the channel has been closed. + pub fn commit_executed_block_hash(&self, block_meta: ExecutedBlockMeta) -> Option<()> { + self.verified_block_hash_tx.send(block_meta).ok() + } +} + +/// Called by EL. +#[derive(Debug)] +pub struct PipeExecLayerExt { + /// Receive events from PipeExecService + pub event_rx: Mutex>, +} + +/// A static instance of `PipeExecLayerExt` used for dispatching events. +pub static PIPE_EXEC_LAYER_EXT: OnceCell = OnceCell::new(); + +/// Create a new `PipeExecLayerApi` instance and launch a `PipeExecService`. +pub fn new_pipe_exec_layer_api(chain_spec: Arc) -> PipeExecLayerApi { + let (ordered_block_tx, ordered_block_rx) = tokio::sync::mpsc::unbounded_channel(); + let (executed_block_hash_tx, executed_block_hash_rx) = tokio::sync::mpsc::unbounded_channel(); + let (verified_block_hash_tx, verified_block_hash_rx) = tokio::sync::mpsc::unbounded_channel(); + let (latest_block_header_tx, latest_block_header_rx) = tokio::sync::mpsc::channel(1); + let (event_tx, event_rx) = tokio::sync::mpsc::unbounded_channel(); + + let service = PipeExecService { + core: Arc::new(Core { + executed_block_hash_tx, + verified_block_hash_rx: Mutex::new(verified_block_hash_rx), + latest_block_header_tx, + storage: GravityStorage {}, + evm_config: EthEvmConfig::new(chain_spec.clone()), + chain_spec, + latest_canonical_block_number: AtomicU64::new(0), + event_tx, + }), + ordered_block_rx, + latest_block_header_rx, + }; + tokio::spawn(service.run()); + + PIPE_EXEC_LAYER_EXT.get_or_init(|| PipeExecLayerExt { event_rx: Mutex::new(event_rx) }); + + PipeExecLayerApi { + ordered_block_tx, + executed_block_hash_rx: Mutex::new(executed_block_hash_rx), + verified_block_hash_tx, + } +} + +#[cfg(test)] +mod test { + #[test] + fn test() { + todo!() + } +} diff --git a/crates/pipe-exec-layer-ext/src/lib.rs b/crates/pipe-exec-layer-ext/src/lib.rs index 7a96a5b57..eee58f3b4 100644 --- a/crates/pipe-exec-layer-ext/src/lib.rs +++ b/crates/pipe-exec-layer-ext/src/lib.rs @@ -1,6 +1,8 @@ +//! Pipeline execution layer extension + use alloy_primitives::B256; use once_cell::sync::OnceCell; -use reth_primitives::{Address, BlockWithSenders, TransactionSigned}; +use reth_primitives::{Address, TransactionSigned}; use reth_rpc_types::engine::PayloadId; use tokio::sync::{ mpsc::{UnboundedReceiver, UnboundedSender},