diff --git a/based/Cargo.lock b/based/Cargo.lock index a65456716..e263ab6e1 100644 --- a/based/Cargo.lock +++ b/based/Cargo.lock @@ -1020,6 +1020,12 @@ dependencies = [ "derive_arbitrary", ] +[[package]] +name = "arc-swap" +version = "1.7.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "69f7f8c3906b62b754cd5326047894316021dcfe5a194c8ea52bdd94934a3457" + [[package]] name = "ark-bls12-381" version = "0.5.0" @@ -1577,6 +1583,53 @@ version = "1.7.3" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "89e25b6adfb930f02d1981565a6e5d9c547ac15a96606256d3b59040e5cd4ca3" +[[package]] +name = "based-op-reth" +version = "0.1.0" +dependencies = [ + "alloy-consensus", + "alloy-eips", + "alloy-network", + "alloy-primitives", + "alloy-rpc-types", + "alloy-rpc-types-eth", + "anyhow", + "arc-swap", + "bop-common", + "clap", + "eyre", + "futures", + "jsonrpsee", + "op-alloy-consensus", + "op-alloy-network", + "op-alloy-rpc-types", + "reth", + "reth-chainspec 1.9.3", + "reth-db 1.9.3", + "reth-engine-tree 1.9.3", + "reth-evm 1.9.3", + "reth-exex 1.9.3", + "reth-node-builder 1.9.3", + "reth-optimism-chainspec 1.9.3", + "reth-optimism-cli 1.9.3", + "reth-optimism-consensus 1.9.3", + "reth-optimism-evm 1.9.3", + "reth-optimism-node 1.9.3", + "reth-optimism-primitives 1.9.3", + "reth-optimism-rpc 1.9.3", + "reth-revm 1.9.3", + "reth-rpc 1.9.3", + "reth-rpc-convert 1.9.3", + "reth-rpc-eth-api 1.9.3", + "reth-storage-api 1.9.3", + "reth-storage-errors 1.9.3", + "revm", + "thiserror 2.0.17", + "tokio", + "tokio-stream", + "tracing", +] + [[package]] name = "based-portal" version = "0.1.0" @@ -3047,7 +3100,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "8d162beedaa69905488a8da94f5ac3edb4dd4788b732fadb7bd120b2625c1976" dependencies = [ "data-encoding", - "syn 1.0.109", + "syn 2.0.111", ] [[package]] @@ -7094,40 +7147,6 @@ version = "0.7.4" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "95325155c684b1c89f7765e30bc1c42e4a6da51ca513615660cb8a62ef9a88e3" -[[package]] -name = "reth" -version = "0.1.0" -dependencies = [ - "alloy-consensus", - "alloy-eips", - "alloy-network", - "alloy-primitives", - "alloy-rpc-types", - "alloy-rpc-types-eth", - "anyhow", - "bop-common", - "clap", - "eyre", - "futures", - "jsonrpsee", - "op-alloy-consensus", - "op-alloy-network", - "op-alloy-rpc-types", - "reth 1.9.3", - "reth-db 1.9.3", - "reth-exex 1.9.3", - "reth-node-builder 1.9.3", - "reth-optimism-chainspec 1.9.3", - "reth-optimism-cli 1.9.3", - "reth-optimism-node 1.9.3", - "reth-rpc 1.9.3", - "reth-rpc-eth-api 1.9.3", - "thiserror 2.0.17", - "tokio", - "tokio-stream", - "tracing", -] - [[package]] name = "reth" version = "1.9.3" diff --git a/based/crates/reth/Cargo.toml b/based/crates/reth/Cargo.toml index 202af1c76..e594d714b 100644 --- a/based/crates/reth/Cargo.toml +++ b/based/crates/reth/Cargo.toml @@ -1,6 +1,6 @@ [package] edition.workspace = true -name = "reth" +name = "based-op-reth" repository.workspace = true rust-version.workspace = true version.workspace = true @@ -11,27 +11,39 @@ alloy-eips.workspace = true alloy-network.workspace = true alloy-primitives.workspace = true alloy-rpc-types.workspace = true +alloy-rpc-types-eth = "1.1.3" anyhow = "1.0.98" +arc-swap = "1.7.1" bop-common.workspace = true clap.workspace = true eyre.workspace = true futures.workspace = true jsonrpsee.workspace = true +op-alloy-consensus = "0.22.1" op-alloy-network.workspace = true op-alloy-rpc-types.workspace = true -thiserror.workspace = true -tokio.workspace = true -tokio-stream.workspace = true -tracing.workspace = true - reth = { git = "https://github.com/paradigmxyz/reth", tag = "v1.9.3" } +reth-chainspec = { git = "https://github.com/paradigmxyz/reth", tag = "v1.9.3", package = "reth-chainspec" } reth-db = { git = "https://github.com/paradigmxyz/reth", tag = "v1.9.3" } +reth-engine-tree = { git = "https://github.com/paradigmxyz/reth", tag = "v1.9.3", package = "reth-engine-tree" } +reth-evm = { git = "https://github.com/paradigmxyz/reth", tag = "v1.9.3", package = "reth-evm" } reth-exex = { git = "https://github.com/paradigmxyz/reth", tag = "v1.9.3" } reth-node-builder = { git = "https://github.com/paradigmxyz/reth", tag = "v1.9.3" } -reth-optimism-chainspec = { git = "https://github.com/paradigmxyz/reth", tag = "v1.9.3" } +reth-optimism-chainspec = { git = "https://github.com/paradigmxyz/reth", tag = "v1.9.3", package = "reth-optimism-chainspec" } reth-optimism-cli = { git = "https://github.com/paradigmxyz/reth", tag = "v1.9.3" } +reth-optimism-evm = { git = "https://github.com/paradigmxyz/reth", tag = "v1.9.3", package = "reth-optimism-evm" } reth-optimism-node = { git = "https://github.com/paradigmxyz/reth", tag = "v1.9.3" } +reth-optimism-primitives = { git = "https://github.com/paradigmxyz/reth", tag = "v1.9.3", package = "reth-optimism-primitives" } +reth-optimism-rpc = { git = "https://github.com/paradigmxyz/reth", tag = "v1.9.3", package = "reth-optimism-rpc" } +reth-revm = { git = "https://github.com/paradigmxyz/reth", tag = "v1.9.3", package = "reth-revm" } reth-rpc = { git = "https://github.com/paradigmxyz/reth", tag = "v1.9.3" } +reth-rpc-convert = { git = "https://github.com/paradigmxyz/reth", tag = "v1.9.3", package = "reth-rpc-convert" } reth-rpc-eth-api = { git = "https://github.com/paradigmxyz/reth", tag = "v1.9.3" } -op-alloy-consensus = "0.22.1" -alloy-rpc-types-eth = "1.1.3" +reth-storage-api = { git = "https://github.com/paradigmxyz/reth", tag = "v1.9.3", package = "reth-storage-api" } +reth-storage-errors = { git = "https://github.com/paradigmxyz/reth", tag = "v1.9.3", package = "reth-storage-errors" } +revm = { version = "31.0.1", features = ["optional_balance_check", "secp256k1", "std"], default-features = false } +thiserror.workspace = true +tokio.workspace = true +tokio-stream.workspace = true +tracing.workspace = true +reth_optimism_consensus = { git = "https://github.com/paradigmxyz/reth", tag = "v1.9.3", package = "reth-optimism-consensus" } diff --git a/based/crates/reth/src/api/eth.rs b/based/crates/reth/src/api/eth.rs index b29896700..6e8835183 100644 --- a/based/crates/reth/src/api/eth.rs +++ b/based/crates/reth/src/api/eth.rs @@ -1,4 +1,4 @@ -use std::{collections::HashSet, time::Duration}; +use std::{sync::Arc, time::Duration}; use alloy_eips::{BlockId, BlockNumberOrTag}; use alloy_primitives::{Address, TxHash, U256}; @@ -7,6 +7,7 @@ use alloy_rpc_types::{ simulate::{SimBlock, SimulatePayload, SimulatedBlock}, state::{EvmOverrides, StateOverride, StateOverridesBuilder}, }; +use arc_swap::ArcSwapOption; use jsonrpsee::{ core::{RpcResult, async_trait}, proc_macros::rpc, @@ -16,12 +17,16 @@ use op_alloy_network::Optimism; use op_alloy_rpc_types::OpTransactionRequest; use reth::{providers::CanonStateSubscriptions as _, rpc::server_types::eth::EthApiError}; use reth_rpc::EthFilter; +use reth_rpc_convert::RpcReceipt; use reth_rpc_eth_api::{ - EthApiTypes, EthFilterApiServer, RpcBlock, RpcReceipt, RpcTransaction, + EthApiTypes, EthFilterApiServer, RpcBlock, RpcTransaction, helpers::{EthBlocks, EthCall, EthState, EthTransactions, FullEthApi}, }; +use tokio::sync::broadcast::error::RecvError; use tokio_stream::{StreamExt, wrappers::BroadcastStream}; +use crate::{api::ToRpc as _, unsealed_block::UnsealedBlock}; + /// Max configured timeout for `eth_sendRawTransactionSync`. const SEND_RAW_TX_SYNC_TIMEOUT: Duration = Duration::from_millis(6_000); @@ -93,7 +98,7 @@ pub trait EthApi { pub struct EthApi { pub canonical: Eth, pub eth_filter: EthFilter, - pub unsealed_state: (), + pub unsealed_block: Arc>, pub unsealed_as_latest: bool, } @@ -115,10 +120,10 @@ where block_number = ?number ); - if self.use_unsealed_state(&number) { - // TODO: Implement pending blocks - - EthBlocks::rpc_block(&self.canonical, BlockNumberOrTag::Latest.into(), full).await.map_err(Into::into) + if self.use_unsealed_state(&number) && + let Some(unsealed_block) = self.unsealed_block.load_full() + { + Ok(Some(unsealed_block.to_rpc_block(full))) } else { EthBlocks::rpc_block(&self.canonical, number.into(), full).await.map_err(Into::into) } @@ -135,7 +140,11 @@ where return Ok(Some(canonical_receipt)); } - // TODO: Implement pending transaction receipts + if let Some(unsealed_block) = self.unsealed_block.load_full() { + if let Some(receipt) = unsealed_block.get_transaction_receipt(&tx_hash) { + return Ok(Some(receipt.into_rpc())); + } + } Ok(None) } @@ -147,7 +156,11 @@ where ); let block_id = block_number.unwrap_or_default(); if self.use_unsealed_state(&block_id) { - // TODO: Pending balance + if let Some(unsealed_block) = self.unsealed_block.load_full() { + if let Some(balance) = unsealed_block.get_balance(address) { + return Ok(balance); + } + } } EthState::balance(&self.canonical, address, block_number).await.map_err(Into::into) @@ -160,22 +173,18 @@ where ); let block_id = block_number.unwrap_or_default(); - if self.use_unsealed_state(&block_id) { - todo!(); - // let pending_blocks = self.flashblocks_state.get_pending_blocks(); - // let canon_block = pending_blocks.get_canonical_block_number(); - // let fb_count = pending_blocks.get_transaction_count(address); - - // let fb_count = 0; - // let canon_count = EthState::transaction_count(&self.canonical, address, Some(canon_block.into())) - // .await - // .map_err(Into::into)?; + let mut count = + EthState::transaction_count(&self.canonical, address, block_number).await.map_err(Into::into)?; - // return Ok(canon_count + fb_count); + if self.use_unsealed_state(&block_id) { + if let Some(unsealed_block) = self.unsealed_block.load_full() { + let unsealed_count = unsealed_block.get_transaction_count(address); + count += unsealed_count; + } } - EthState::transaction_count(&self.canonical, address, block_number).await.map_err(Into::into) + Ok(count) } async fn transaction_by_hash(&self, tx_hash: TxHash) -> RpcResult>> { @@ -194,13 +203,11 @@ where return Ok(Some(canonical_tx)); } - // TODO: - // Fall back to flashblocks for pending transactions - // let pending_blocks = self.flashblocks_state.get_pending_blocks(); - // if let Some(fb_transaction) = pending_blocks.get_transaction_by_hash(tx_hash) { - // self.metrics.get_transaction_by_hash.increment(1); - // return Ok(Some(fb_transaction)); - // } + if let Some(unsealed_block) = self.unsealed_block.load_full() { + if let Some(tx) = unsealed_block.get_transaction(&tx_hash) { + return Ok(Some(tx)); + } + } Ok(None) } @@ -280,14 +287,13 @@ where block_overrides = ?block_overrides, ); - let mut block_id = block_number.unwrap_or_default(); + let block_id = block_number.unwrap_or_default(); let mut pending_overrides = EvmOverrides::default(); // If the call is to pending block use cached override (if they exist) - if self.use_unsealed_state(&block_id) { - // TODO: - // let pending_blocks = self.flashblocks_state.get_pending_blocks(); - // block_id = pending_blocks.get_canonical_block_number().into(); - // pending_overrides.state = pending_blocks.get_state_overrides(); + if self.use_unsealed_state(&block_id) && + let Some(unsealed_block) = self.unsealed_block.load_full() + { + pending_overrides.state = unsealed_block.get_state_overrides(); } // Apply user's overrides on top @@ -319,14 +325,13 @@ where overrides = ?overrides, ); - let mut block_id = block_number.unwrap_or_default(); + let block_id = block_number.unwrap_or_default(); let mut pending_overrides = EvmOverrides::default(); // If the call is to pending block use cached override (if they exist) - if self.use_unsealed_state(&block_id) { - // TODO: - // let pending_blocks = self.flashblocks_state.get_pending_blocks(); - // block_id = pending_blocks.get_canonical_block_number().into(); - // pending_overrides.state = pending_blocks.get_state_overrides(); + if self.use_unsealed_state(&block_id) && + let Some(unsealed_block) = self.unsealed_block.load_full() + { + pending_overrides.state = unsealed_block.get_state_overrides(); } let mut state_overrides_builder = StateOverridesBuilder::new(pending_overrides.state.unwrap_or_default()); @@ -348,15 +353,14 @@ where block_number = ?block_number, ); - let mut block_id = block_number.unwrap_or_default(); + let block_id = block_number.unwrap_or_default(); let mut pending_overrides = EvmOverrides::default(); // If the call is to pending block use cached override (if they exist) - if self.use_unsealed_state(&block_id) { - // TODO: - // let pending_blocks = self.flashblocks_state.get_pending_blocks(); - // block_id = pending_blocks.get_canonical_block_number().into(); - // pending_overrides.state = pending_blocks.get_state_overrides(); + if self.use_unsealed_state(&block_id) && + let Some(unsealed_block) = self.unsealed_block.load_full() + { + pending_overrides.state = unsealed_block.get_state_overrides(); } // Prepend flashblocks pending overrides to the block state calls @@ -399,11 +403,13 @@ where // Mixed query: toBlock is pending, so we need to combine historical + pending logs let mut all_logs = Vec::new(); - // TODO: - // let pending_blocks = self.flashblocks_state.get_pending_blocks(); - // let pending_logs = pending_blocks.get_pending_logs(&filter); + if self.use_unsealed_state(&to_block.unwrap_or_default()) && + let Some(unsealed_block) = self.unsealed_block.load_full() + { + let pending_logs = unsealed_block.get_unsealed_logs(&filter); + all_logs.extend(pending_logs); + } - let mut fetched_logs = HashSet::new(); // Get historical logs if fromBlock is not pending if !matches!(from_block, Some(BlockNumberOrTag::Pending)) { // Create a filter for historical data (fromBlock to latest) @@ -412,22 +418,11 @@ where FilterBlockOption::Range { from_block, to_block: Some(BlockNumberOrTag::Latest) }; let historical_logs: Vec = self.eth_filter.logs(historical_filter).await?; - for log in &historical_logs { - fetched_logs.insert((log.block_number, log.log_index)); - } all_logs.extend(historical_logs); } - // Always get pending logs when toBlock is pending - - // TODO: // Dedup any logs from the pending state that may already have been covered in the historical logs - // let deduped_pending_logs: Vec = pending_logs - // .iter() - // .filter(|log| !fetched_logs.contains(&(log.block_number, log.log_index))) - // .cloned() - // .collect(); - // all_logs.extend(deduped_pending_logs); + all_logs.dedup(); Ok(all_logs) } @@ -438,27 +433,29 @@ where Eth: FullEthApi + Send + Sync + 'static, { async fn wait_for_frag_receipt(&self, tx_hash: TxHash) -> Option> { - // TODO: Subscribe to frags - // let mut receiver = self.flashblocks_state.subscribe_to_flashblocks(); - - // loop { - // match receiver.recv().await { - // Ok(pending_state) if pending_state.get_receipt(tx_hash).is_some() => { - // debug!(message = "found receipt in flashblock", tx_hash = %tx_hash); - // return pending_state.get_receipt(tx_hash); - // } - // Ok(_) => { - // trace!(message = "flashblock does not contain receipt", tx_hash = %tx_hash); - // } - // Err(RecvError::Closed) => { - // debug!(message = "flashblocks receipt queue closed"); - // return None; - // } - // Err(RecvError::Lagged(_)) => { - // warn!("Flashblocks receipt queue lagged, maybe missing receipts"); - // } - // } - // } + if let Some(unsealed_block) = self.unsealed_block.load_full() { + let mut receiver = unsealed_block.subscribe_new_blocks(); + + loop { + match receiver.recv().await { + Ok(block) => { + if let Some(receipt) = unsealed_block.get_transaction_receipt(&tx_hash) { + tracing::debug!(%tx_hash, block_number = block.number(), block_hash = %block.hash(), "Receipt found"); + return Some(receipt.into_rpc()); + } + + continue; + } + Err(RecvError::Closed) => { + tracing::debug!("Unsealed block receipt queue closed"); + return None; + } + Err(RecvError::Lagged(_)) => { + tracing::warn!("Unsealed block receipt queue lagged, maybe missing receipts"); + } + } + } + } None } diff --git a/based/crates/reth/src/api/mod.rs b/based/crates/reth/src/api/mod.rs index c4997653c..696463efb 100644 --- a/based/crates/reth/src/api/mod.rs +++ b/based/crates/reth/src/api/mod.rs @@ -1,2 +1,21 @@ +use op_alloy_network::Optimism; +use op_alloy_rpc_types::OpTransactionReceipt; +use reth_rpc_eth_api::RpcReceipt; + pub mod engine; pub mod eth; + +pub(crate) trait ToRpc { + type RpcVariant; + + /// Convert the type into its RPC variant. + fn into_rpc(self) -> Self::RpcVariant; +} + +impl ToRpc for OpTransactionReceipt { + type RpcVariant = RpcReceipt; + + fn into_rpc(self) -> Self::RpcVariant { + RpcReceipt::::from(self) + } +} diff --git a/based/crates/reth/src/cli.rs b/based/crates/reth/src/cli.rs index c1f73fef8..9fe556889 100644 --- a/based/crates/reth/src/cli.rs +++ b/based/crates/reth/src/cli.rs @@ -1,6 +1,8 @@ +use std::sync::{Arc, OnceLock}; + use clap::Parser; use futures::TryStreamExt as _; -use reth::providers::CanonStateSubscriptions as _; +use reth::providers::{CanonStateSubscriptions as _, providers::BlockchainProvider}; use reth_exex::ExExEvent; use reth_node_builder::Node; use reth_optimism_cli::{Cli, chainspec::OpChainSpecParser}; @@ -54,20 +56,21 @@ where /// Internal helper that runs the node with a parsed CLI instance. fn run_with_cli(cli: Cli) -> eyre::Result<()> { cli.run(|builder, args| async move { - let driver = Driver::new(args.based_op.unsealed_as_latest); + let driver = Arc::new(OnceLock::::new()); let op_node = OpNode::new(args.rollup.clone()); let node_handle = builder - .with_types::() + .with_types_and_provider::>() .with_components(op_node.components()) .with_add_ons(op_node.add_ons()) // Install the execution extension to handle canonical chain updates .install_exex("based-op", { // Get a clone of the driver handle. - let driver = driver.clone(); + let driver = Arc::clone(&driver); move |mut ctx| async move { Ok(async move { + let driver = driver.get_or_init(|| Driver::new(ctx.provider().clone())); while let Some(note) = ctx.notifications.try_next().await? { // TODO: Handle reorged and reverted chains? if let Some(committed) = note.committed_chain() { @@ -84,45 +87,48 @@ fn run_with_cli(cli: Cli) -> eyre::Result<() }) } }) - .extend_rpc_modules(move |ctx| { - let provider = ctx.provider().clone(); - - let mut canon_stream = provider.subscribe_to_canonical_state(); - - // NOTE: Not entirely sure why this is needed - // Ref: - tokio::spawn(async move { - while let Ok(notif) = canon_stream.recv().await { - provider.canonical_in_memory_state().notify_canon_state(notif); - } - }); - - // Add based engine API modules to the existing auth module. - ctx.auth_module.merge_auth_methods(BasedEngineApi::new(driver).into_rpc())?; - - // Print supported engine_ methods - let methods = ctx - .auth_module - .module_mut() - .method_names() - .filter(|m| m.starts_with("engine_")) - .collect::>(); - - tracing::info!(supported_methods = ?methods, "Configured based engine API"); - - // Configure extended Eth API - let eth = EthApi { - canonical: ctx.registry.eth_api().clone(), - eth_filter: ctx.registry.eth_handlers().filter.clone(), - unsealed_state: (), - unsealed_as_latest: args.based_op.unsealed_as_latest, - }; - - ctx.modules.replace_configured(eth.into_rpc())?; - - // TODO: - // - Replace eth API - Ok(()) + .extend_rpc_modules({ + let driver = driver.clone(); + move |ctx| { + let provider = ctx.provider().clone(); + + let mut canon_stream = provider.subscribe_to_canonical_state(); + + // NOTE: Not entirely sure why this is needed + // Ref: + tokio::spawn(async move { + while let Ok(notif) = canon_stream.recv().await { + provider.canonical_in_memory_state().notify_canon_state(notif); + } + }); + + let driver = driver.get_or_init(|| Driver::new(ctx.provider().clone())); + + // Add based engine API modules to the existing auth module. + ctx.auth_module.merge_auth_methods(BasedEngineApi::new(driver.clone()).into_rpc())?; + + // Print supported engine_ methods + let methods = ctx + .auth_module + .module_mut() + .method_names() + .filter(|m| m.starts_with("engine_")) + .collect::>(); + + tracing::info!(supported_methods = ?methods, "Configured based engine API"); + + // Configure extended Eth API + let eth = EthApi { + canonical: ctx.registry.eth_api().clone(), + eth_filter: ctx.registry.eth_handlers().filter.clone(), + unsealed_block: driver.unsealed_block(), + unsealed_as_latest: args.based_op.unsealed_as_latest, + }; + + ctx.modules.replace_configured(eth.into_rpc())?; + + Ok(()) + } }) .launch() .await?; diff --git a/based/crates/reth/src/driver.rs b/based/crates/reth/src/driver.rs index ef98207ba..fcf4768a4 100644 --- a/based/crates/reth/src/driver.rs +++ b/based/crates/reth/src/driver.rs @@ -1,31 +1,39 @@ -use std::time::Instant; +use std::{sync::Arc, time::Instant}; +use alloy_consensus::Header; use alloy_primitives::B256; -use alloy_rpc_types::Block; +use arc_swap::ArcSwapOption; use bop_common::{ p2p::{EnvV0, FragV0, SealV0}, typedefs::OpBlock, }; +use reth_chainspec::{ChainSpecProvider, EthChainSpec}; +use reth_optimism_chainspec::OpHardforks; +use reth_storage_api::{ + BlockReaderIdExt, BlockWriter, CanonChainTracker, DatabaseProviderFactory, StateProviderFactory, +}; use tokio::sync::{mpsc, oneshot}; use tracing::{error, info}; use crate::{ error::{DriverError, ValidateSealError}, - exec::{NoopExecutor, UnsealedExecutor, apply_exec_output}, + exec::{StateExecutor, UnsealedExecutor}, unsealed_block::UnsealedBlock, }; /// Result of submitting a frag to the driver. #[derive(Debug, Clone, Copy)] pub enum FragStatus { + Ignored, Valid, Invalid, } /// Actor handle for sending unsealed-block commands to the driver task. -#[derive(Clone)] +#[derive(Clone, Debug)] pub struct Driver { tx: mpsc::Sender, + unsealed_block: Arc>, } impl From> for DriverError { @@ -64,43 +72,53 @@ fn respond(resp: Resp, res: Result) { }); } +#[derive(Debug)] enum Cmd { EnvV0 { env: EnvV0, resp: Resp<()> }, NewFragV0 { frag: FragV0, resp: Resp }, SealFragV0 { seal: SealV0, resp: Resp<()> }, - ForkchoiceUpdated { block: OpBlock, resp: Resp<()> }, + ForkchoiceUpdated { block: Box, resp: Resp<()> }, GetHeaderView { resp: Resp }, } #[derive(Debug, Clone)] pub struct HeaderView { pub enabled: bool, - pub header: Option, + pub header: Option
, } /// Single-threaded state owned by the driver task (unsealed block + executor + counters). -/// Essentialy should be implemented using based-op-reth +/// Essentially should be implemented using based-op-reth #[derive(Debug)] pub struct DriverInner { - pub enabled_unsealed_as_latest: bool, - pub current_unsealed_block: Option, + pub current_unsealed_block: Arc>, pub exec: E, pub fcu_count_since_unseal_reset: usize, } impl Driver { - pub fn new(unsealed_as_latest: bool) -> Self { - Self::spawn(DriverInner { - enabled_unsealed_as_latest: unsealed_as_latest, - current_unsealed_block: None, - exec: NoopExecutor, - fcu_count_since_unseal_reset: 0, - }) + pub fn new(client: Client) -> Self + where + Client: StateProviderFactory + + ChainSpecProvider + OpHardforks> + + BlockReaderIdExt
+ + CanonChainTracker
+ + DatabaseProviderFactory + + Clone + + 'static, + ::ProviderRW: BlockWriter, + { + let executor = StateExecutor::new(client); + let current_unsealed_block = executor.shared_unsealed_block(); + + Self::spawn(DriverInner { current_unsealed_block, exec: executor, fcu_count_since_unseal_reset: 0 }) } /// Spawns the driver actor task and returns a handle used to send commands to it. pub fn spawn(inner: DriverInner) -> Self { + info!(target: "based-op", "Spawning frag driver"); let (tx, mut rx) = mpsc::channel::(256); + let unsealed_block = inner.current_unsealed_block.clone(); tokio::spawn(async move { let mut inner = inner; @@ -117,7 +135,7 @@ impl Driver { respond(resp, inner.handle_seal_frag_v0(seal).await); } Cmd::ForkchoiceUpdated { block, resp } => { - respond(resp, inner.handle_forkchoice_updated(block).await); + respond(resp, inner.handle_forkchoice_updated(*block).await); } Cmd::GetHeaderView { resp } => { let _ = resp.send(Reply::Ok(inner.get_header_view())); @@ -126,7 +144,12 @@ impl Driver { } }); - Self { tx } + Self { tx, unsealed_block } + } + + /// Returns a clone of the current unsealed block. + pub fn unsealed_block(&self) -> Arc> { + Arc::clone(&self.unsealed_block) } /// Starts a new unsealed block execution context for the given environment. @@ -153,7 +176,7 @@ impl Driver { /// Notifies the driver about a forkchoice update and resets state on mismatch. pub async fn forkchoice_updated(&self, block: OpBlock) -> Result<(), DriverError> { let (resp_tx, resp_rx) = oneshot::channel(); - self.tx.send(Cmd::ForkchoiceUpdated { block, resp: resp_tx }).await?; + self.tx.send(Cmd::ForkchoiceUpdated { block: Box::new(block), resp: resp_tx }).await?; resp_rx.await?.into_result() } @@ -174,7 +197,6 @@ impl Driver { impl DriverInner { fn reset_current_unsealed_block(&mut self) { - self.current_unsealed_block = None; self.exec.reset(); self.fcu_count_since_unseal_reset = 0; } @@ -182,7 +204,7 @@ impl DriverInner { async fn handle_env_v0(&mut self, env: EnvV0) -> Result<(), DriverError> { info!(block = env.number, "envV0 received"); - if let Some(current) = self.current_unsealed_block.as_ref() { + if let Some(current) = self.current_unsealed_block.load_full().as_ref() { let current_num = current.env.number; if current_num >= env.number { @@ -193,8 +215,7 @@ impl DriverInner { self.reset_current_unsealed_block(); } - self.exec.ensure_env(&env).await?; - self.current_unsealed_block = Some(UnsealedBlock::new(env)); + self.exec.ensure_env(&env)?; // this should update current_unsealed_block too because shared arc self.fcu_count_since_unseal_reset = 0; Ok(()) } @@ -202,7 +223,7 @@ impl DriverInner { async fn handle_forkchoice_updated(&mut self, block: OpBlock) -> Result<(), DriverError> { self.fcu_count_since_unseal_reset += 1; - let Some(ub) = self.current_unsealed_block.as_ref() else { + let Some(ub) = self.current_unsealed_block.load_full() else { return Ok(()); }; @@ -220,7 +241,7 @@ impl DriverInner { async fn handle_new_frag_v0(&mut self, frag: FragV0) -> Result { let start = Instant::now(); - let Some(ub) = self.current_unsealed_block.as_mut() else { + let Some(ub) = self.current_unsealed_block.load_full() else { return Err(DriverError::NotInitialized); }; @@ -228,7 +249,7 @@ impl DriverInner { if frag.block_number < ub.env.number { info!(frag_block = frag.block_number, env_number = ub.env.number, "stale frag (older block), ignoring"); - return Ok(FragStatus::Valid); + return Ok(FragStatus::Ignored); } if let Err(e) = ub.validate_new_frag(&frag) { @@ -237,8 +258,8 @@ impl DriverInner { return Err(DriverError::from(e)); } - let out = match self.exec.execute_frag(ub, &frag).await { - Ok(out) => out, + match self.exec.execute_frag(&frag) { + Ok(()) => (), Err(e) => { error!(error = %e, "execution failed, discarding unsealed block"); self.reset_current_unsealed_block(); @@ -246,14 +267,11 @@ impl DriverInner { } }; - apply_exec_output(ub, out); - ub.accept_frag(frag); - info!(elapsed_ms = start.elapsed().as_millis(), "frag inserted + executed"); if ub.last_frag().is_some_and(|f| f.is_last) { info!("last frag received, pre-sealing block"); - if let Err(e) = self.exec.seal(ub).await { + if let Err(e) = self.exec.seal() { error!(error = %e, "seal failed, discarding unsealed block"); self.reset_current_unsealed_block(); return Err(DriverError::from(e)); @@ -265,7 +283,7 @@ impl DriverInner { async fn handle_seal_frag_v0(&mut self, seal: SealV0) -> Result<(), DriverError> { let start = Instant::now(); - let Some(ub) = self.current_unsealed_block.as_ref() else { + let Some(ub) = self.current_unsealed_block.load_full() else { return Err(DriverError::NotInitialized); }; @@ -274,9 +292,7 @@ impl DriverInner { return Ok(()); } - let presealed_block = self.exec.get_block(seal.block_hash, seal.block_number).await; - - let presealed_block = match presealed_block { + let presealed_block = match self.exec.get_block(seal.block_hash) { Ok(b) => b, Err(e) => { self.reset_current_unsealed_block(); @@ -284,9 +300,9 @@ impl DriverInner { } }; - self.validate_seal_frag_v0(&presealed_block, ub, &seal)?; + self.validate_seal_frag_v0(&presealed_block, ub.as_ref(), &seal)?; - self.exec.set_canonical(&presealed_block).await?; + self.exec.set_canonical(&presealed_block)?; self.reset_current_unsealed_block(); @@ -296,65 +312,55 @@ impl DriverInner { fn validate_seal_frag_v0( &self, - presealed_block: &Block, + presealed_block: &OpBlock, ub: &UnsealedBlock, seal: &SealV0, ) -> Result<(), ValidateSealError> { - let expected_block_hash: B256 = presealed_block.header.hash.into(); + let expected_block_hash: B256 = presealed_block.header.hash_slow(); if expected_block_hash != seal.block_hash { - return Err(ValidateSealError::BlockHashMismatch { expected: expected_block_hash, got: seal.block_hash }); + return Err(ValidateSealError::BlockHash { expected: expected_block_hash, got: seal.block_hash }); } let expected_parent_hash = presealed_block.header.parent_hash; if expected_parent_hash != seal.parent_hash { - return Err(ValidateSealError::ParentHashMismatch { expected: expected_parent_hash, got: seal.parent_hash }); + return Err(ValidateSealError::ParentHash { expected: expected_parent_hash, got: seal.parent_hash }); } let expected_state_root = presealed_block.header.state_root; if expected_state_root != seal.state_root { - return Err(ValidateSealError::StateRootMismatch { expected: expected_state_root, got: seal.state_root }); + return Err(ValidateSealError::StateRoot { expected: expected_state_root, got: seal.state_root }); } let expected_tx_root = presealed_block.header.transactions_root; if expected_tx_root != seal.transactions_root { - return Err(ValidateSealError::TransactionsRootMismatch { - expected: expected_tx_root, - got: seal.transactions_root, - }); + return Err(ValidateSealError::TransactionsRoot { expected: expected_tx_root, got: seal.transactions_root }); } let expected_receipts_root = presealed_block.header.receipts_root; if expected_receipts_root != seal.receipts_root { - return Err(ValidateSealError::ReceiptsRootMismatch { - expected: expected_receipts_root, - got: seal.receipts_root, - }); + return Err(ValidateSealError::ReceiptsRoot { expected: expected_receipts_root, got: seal.receipts_root }); } let expected_gas_used = presealed_block.header.gas_used; if expected_gas_used != seal.gas_used { - return Err(ValidateSealError::GasUsedMismatch { expected: expected_gas_used, got: seal.gas_used }); + return Err(ValidateSealError::GasUsed { expected: expected_gas_used, got: seal.gas_used }); } let expected_gas_limit = presealed_block.header.gas_limit; if expected_gas_limit != seal.gas_limit { - return Err(ValidateSealError::GasLimitMismatch { expected: expected_gas_limit, got: seal.gas_limit }); + return Err(ValidateSealError::GasLimit { expected: expected_gas_limit, got: seal.gas_limit }); } let expected_total_frags = ub.frags.len() as u64; if expected_total_frags != seal.total_frags { - return Err(ValidateSealError::TotalFragsMismatch { expected: expected_total_frags, got: seal.total_frags }); + return Err(ValidateSealError::TotalFrags { expected: expected_total_frags, got: seal.total_frags }); } Ok(()) } fn get_header_view(&self) -> HeaderView { - if !self.enabled_unsealed_as_latest { - return HeaderView { enabled: false, header: None }; - } - - let header = self.current_unsealed_block.as_ref().map(|ub| ub.temp_header()); + let header = self.current_unsealed_block.load_full().map(|ub| ub.get_header()); HeaderView { enabled: true, header } } } diff --git a/based/crates/reth/src/error.rs b/based/crates/reth/src/error.rs index 96778fa2a..f96808ff9 100644 --- a/based/crates/reth/src/error.rs +++ b/based/crates/reth/src/error.rs @@ -1,5 +1,10 @@ +use alloy_consensus::crypto::RecoveryError; use alloy_eips::eip2718::Eip2718Error; use alloy_primitives::B256; +use op_alloy_consensus::EIP1559ParamError; +use reth_optimism_evm::OpBlockExecutionError; +use reth_optimism_rpc::OpEthApiError; +use reth_storage_errors::ProviderError; use thiserror::Error; #[derive(Debug, Error)] @@ -37,28 +42,28 @@ pub enum DriverError { #[derive(Debug, Error)] pub enum ValidateSealError { #[error("block hash mismatch, expected {expected:?}, got {got:?}")] - BlockHashMismatch { expected: B256, got: B256 }, + BlockHash { expected: B256, got: B256 }, #[error("parent hash mismatch, expected {expected:?}, got {got:?}")] - ParentHashMismatch { expected: B256, got: B256 }, + ParentHash { expected: B256, got: B256 }, #[error("state root mismatch, expected {expected:?}, got {got:?}")] - StateRootMismatch { expected: B256, got: B256 }, + StateRoot { expected: B256, got: B256 }, #[error("transactions root mismatch, expected {expected:?}, got {got:?}")] - TransactionsRootMismatch { expected: B256, got: B256 }, + TransactionsRoot { expected: B256, got: B256 }, #[error("receipts root mismatch, expected {expected:?}, got {got:?}")] - ReceiptsRootMismatch { expected: B256, got: B256 }, + ReceiptsRoot { expected: B256, got: B256 }, #[error("gas used mismatch, expected {expected}, got {got}")] - GasUsedMismatch { expected: u64, got: u64 }, + GasUsed { expected: u64, got: u64 }, #[error("gas limit mismatch, expected {expected}, got {got}")] - GasLimitMismatch { expected: u64, got: u64 }, + GasLimit { expected: u64, got: u64 }, #[error("total frags mismatch, expected {expected}, got {got}")] - TotalFragsMismatch { expected: u64, got: u64 }, + TotalFrags { expected: u64, got: u64 }, } #[derive(Debug, Error)] @@ -81,6 +86,9 @@ pub enum UnsealedBlockError { #[error("received frag after last frag already accepted")] AlreadyEnded, + + #[error("operation failed: {0}")] + Failed(String), } #[derive(Debug, Error)] @@ -93,4 +101,22 @@ pub enum ExecError { #[error("seal failed: {0}")] SealFailed(String), + + #[error(transparent)] + StorageProvider(#[from] ProviderError), + + #[error(transparent)] + OpBlockExecution(#[from] OpBlockExecutionError), + + #[error(transparent)] + Recovery(#[from] RecoveryError), + + #[error(transparent)] + Eip1559Param(#[from] EIP1559ParamError), + + #[error(transparent)] + OpEthApi(#[from] OpEthApiError), + + #[error(transparent)] + UnsealedBlock(#[from] UnsealedBlockError), } diff --git a/based/crates/reth/src/exec.rs b/based/crates/reth/src/exec.rs index e116d699f..ff71cfc2c 100644 --- a/based/crates/reth/src/exec.rs +++ b/based/crates/reth/src/exec.rs @@ -1,82 +1,438 @@ -use std::future::Future; +use std::sync::{Arc, Mutex}; -use alloy_primitives::{B256, BlockNumber}; -use alloy_rpc_types::{Block, Log, TransactionReceipt}; -use bop_common::p2p::{EnvV0, FragV0}; +use alloy_consensus::{ + BlockBody, Header, Receipt, Transaction, + transaction::{Recovered, SignerRecoverable, TransactionMeta}, +}; +use alloy_eips::{BlockNumberOrTag, Typed2718, eip2718::Decodable2718}; +use alloy_primitives::{B256, Bytes, Sealable}; +use alloy_rpc_types::Log; +use arc_swap::ArcSwapOption; +use bop_common::{ + p2p::{EnvV0, FragV0}, + typedefs::Database, +}; +use op_alloy_consensus::OpTxEnvelope; +use op_alloy_rpc_types::{OpTransactionReceipt, Transaction as RPCTransaction}; +use reth::{ + api::Block as RethBlock, + network::cache::LruMap, + primitives::{SealedBlock, SealedHeader}, +}; +use reth_chainspec::{ChainSpecProvider, EthChainSpec, EthereumHardforks as _}; +use reth_evm::{ConfigureEvm, Evm, op_revm::OpHaltReason}; +use reth_optimism_chainspec::OpHardforks; +use reth_optimism_consensus::isthmus; +use reth_optimism_evm::{OpEvmConfig, OpNextBlockEnvAttributes}; +use reth_optimism_primitives::{OpBlock, OpPrimitives, OpReceipt, OpTransactionSigned}; +use reth_optimism_rpc::OpReceiptBuilder; +use reth_revm::{ + DatabaseCommit, State, + context::result::{ExecutionResult, ResultAndState}, + database::StateProviderDatabase, +}; +use reth_rpc_convert::transaction::ConvertReceiptInput; +use reth_storage_api::{ + BlockReaderIdExt, BlockWriter, CanonChainTracker, DBProvider, DatabaseProviderFactory, StateProviderFactory, +}; +use revm::database::CacheDB; use crate::{error::ExecError, unsealed_block::UnsealedBlock}; -#[derive(Debug, Clone)] -pub struct ExecOutput { - pub receipts: Vec, - pub logs: Vec, - pub gas_used_delta: u64, -} +const BLOCK_CACHE_LIMIT: u32 = 256; /// This trait is the ONLY place that needs to know about Reth internals. /// Everything else is just state-machine + bookkeeping. pub trait UnsealedExecutor: Send { /// Ensure the executor context is ready for this env (initialize overlay state, block env, etc.) - fn ensure_env(&mut self, env: &EnvV0) -> impl Future> + Send + '_; + fn ensure_env(&mut self, env: &EnvV0) -> Result<(), ExecError>; /// Execute all txs in `frag` on top of current overlay state. /// /// MUST be cumulative: txs execute after all previous frags's txs. - fn execute_frag( - &mut self, - ub: &UnsealedBlock, - frag: &FragV0, - ) -> impl Future> + Send + '_; + fn execute_frag(&mut self, frag: &FragV0) -> Result<(), ExecError>; - fn seal(&mut self, ub: &UnsealedBlock) -> impl Future> + Send + '_; + fn seal(&mut self) -> Result<(), ExecError>; - fn set_canonical(&mut self, b: &Block) -> impl Future> + Send + '_; + fn set_canonical(&mut self, b: &OpBlock) -> Result<(), ExecError>; - fn get_block(&self, hash: B256, number: BlockNumber) -> impl Future> + Send + '_; + fn get_block(&self, hash: B256) -> Result; /// Reset overlay state completely. fn reset(&mut self); } -/// Apply the executor output to the UnsealedBlock (common logic). -pub fn apply_exec_output(ub: &mut UnsealedBlock, out: ExecOutput) { - ub.receipts.extend(out.receipts); - ub.logs.extend(out.logs); - ub.cumulative_gas_used = ub.cumulative_gas_used.saturating_add(out.gas_used_delta); +pub struct StateExecutor { + client: Client, + current_unsealed_block: Arc>, + block_cache: Mutex>, } -/// A very small “dummy” executor so you can compile & test state machine early. -/// Replace with Reth executor. -pub struct NoopExecutor; +impl StateExecutor { + pub fn new(client: Client) -> Self { + Self { + client, + current_unsealed_block: Arc::new(ArcSwapOption::new(None)), + block_cache: Mutex::new(LruMap::new(BLOCK_CACHE_LIMIT)), + } + } + + pub fn shared_unsealed_block(&self) -> Arc> { + Arc::clone(&self.current_unsealed_block) + } +} + +impl UnsealedExecutor for StateExecutor +where + Client: StateProviderFactory + + ChainSpecProvider + OpHardforks> + + BlockReaderIdExt
+ + CanonChainTracker
+ + DatabaseProviderFactory + + Clone + + 'static, + ::ProviderRW: BlockWriter, +{ + fn ensure_env(&mut self, env: &EnvV0) -> Result<(), ExecError> { + let Some(parent) = self.client.block_by_hash(env.parent_hash)? else { + return Err(ExecError::Failed(format!("parent block {} not found", env.parent_hash))) + }; + + let parent_header = parent.header(); + let None = self.current_unsealed_block.load_full() else { return Err(ExecError::NotInitialized) }; + + let expected_block_number = parent_header.number.saturating_sub(1); + if env.number != expected_block_number { + return Err(ExecError::Failed(format!( + "env block number doesn't match expected block number, expected {}, received {}", + expected_block_number, env.number + ))) + } + + if env.timestamp < parent_header.timestamp { + return Err(ExecError::Failed(format!( + "env timestamp is lower than parent block timestamp, parent timestamp {}, env timestamp {}", + parent_header.timestamp, env.timestamp + ))) + } + + let state_provider = + self.client.state_by_block_number_or_tag(BlockNumberOrTag::Number(parent_header.number))?; + let state_provider_db = StateProviderDatabase::new(state_provider); + let state = State::builder().with_database(state_provider_db).with_bundle_update().build(); + + // Check if the current block is a prague block + let is_prague = self.client.chain_spec().is_prague_active_at_timestamp(env.timestamp); + + let ub = UnsealedBlock::new(env.clone(), is_prague).with_db_cache(CacheDB::new(state).cache); + self.current_unsealed_block.store(Some(Arc::new(ub))); + + Ok(()) + } + + fn execute_frag(&mut self, frag: &FragV0) -> Result<(), ExecError> { + let chain_spec = self.client.chain_spec().clone(); + + let ub_arc_opt = self.current_unsealed_block.load_full(); + let frag = frag.clone(); + + let ub_arc = ub_arc_opt.ok_or(ExecError::NotInitialized)?; + + // Make an owned, mutable working copy from the start + let mut ub = ub_arc.as_ref().clone_for_update(); + + let ub_cache = ub.get_db_cache(); + let canonical_block = ub.env.number.saturating_sub(1); + + let last_block_header = self + .client + .header_by_number(canonical_block) + .map_err(|e| ExecError::Failed(format!("header_by_number({canonical_block}) failed: {e}")))? + .ok_or_else(|| ExecError::Failed(format!("missing parent header at {canonical_block}")))?; + + let evm_config = OpEvmConfig::optimism(self.client.chain_spec()); + + let state_provider = self + .client + .state_by_block_number_or_tag(BlockNumberOrTag::Number(canonical_block)) + .map_err(|e| ExecError::Failed(format!("state_by_block_number_or_tag({canonical_block}) failed: {e}")))?; + + let state_provider_db = StateProviderDatabase::new(state_provider); + let state = State::builder().with_database(state_provider_db).with_bundle_update().build(); + + let mut db = CacheDB { cache: ub_cache, db: state }; + + let mut state_overrides = ub.get_state_overrides().unwrap_or_default(); + + let block: OpBlock = build_op_block_from_ub_and_frag(&ub, &frag)?; + let mut l1_block_info = reth_optimism_evm::extract_l1_info(&block.body)?; + let header = block.header.clone().seal_slow(); + + let block_env_attributes = OpNextBlockEnvAttributes { + timestamp: ub.env.timestamp, + suggested_fee_recipient: ub.env.beneficiary, + prev_randao: ub.env.prevrandao, + gas_limit: ub.env.gas_limit, + parent_beacon_block_root: Some(ub.env.parent_beacon_block_root), + extra_data: block.extra_data.clone(), + }; + + let evm_env = evm_config.next_evm_env(&last_block_header, &block_env_attributes)?; + let mut evm = evm_config.evm_with_env(db, evm_env); + + let mut gas_used: u64 = ub.cumulative_blob_gas_used; + let mut logs: Vec = Vec::new(); + let mut next_log_index = 0usize; + let mut receipts: Vec = Vec::new(); + + for (idx, transaction) in block.body.transactions.iter().enumerate() { + let tx_hash = transaction.tx_hash(); + let sender = transaction.recover_signer()?; + ub.increment_nonce(sender); + + let recovered_transaction = Recovered::new_unchecked(transaction.clone(), sender); + let envelope = recovered_transaction.clone().convert::(); + let is_deposit = transaction.is_deposit(); + + let effective_gas_price = if is_deposit { + 0 + } else { + block + .base_fee_per_gas + .map(|base_fee| transaction.effective_tip_per_gas(base_fee).unwrap_or_default() + base_fee as u128) + .unwrap_or_else(|| transaction.max_fee_per_gas()) + }; + + let deposit_nonce = if is_deposit && chain_spec.is_regolith_active_at_timestamp(ub.env.timestamp) { + // depositor nonce (use signer account) + let acc = evm + .db_mut() + .basic(sender) + .map_err(|e| ExecError::Failed(format!("get acc nonce basic() failed: {e}")))? + .unwrap_or_default(); + Some(acc.nonce) // pre-tx nonce + } else { + None + }; -impl UnsealedExecutor for NoopExecutor { - fn ensure_env(&mut self, _env: &EnvV0) -> impl Future> + Send + '_ { - async move { Ok(()) } + let deposit_receipt_version = + if is_deposit && chain_spec.is_canyon_active_at_timestamp(ub.env.timestamp) { Some(1) } else { None }; + + let rpc_txn = RPCTransaction { + inner: alloy_rpc_types_eth::Transaction { + inner: envelope, + block_hash: Some(header.hash()), + block_number: Some(block.number), + transaction_index: Some(idx as u64), + effective_gas_price: Some(effective_gas_price), + }, + deposit_nonce, + deposit_receipt_version, + }; + + ub.with_transaction(rpc_txn); + + match evm.transact(recovered_transaction) { + Ok(ResultAndState { state, result }) => { + for (addr, acc) in &state { + let existing_override = state_overrides.entry(*addr).or_default(); + existing_override.balance = Some(acc.info.balance); + existing_override.nonce = Some(acc.info.nonce); + existing_override.code = acc.info.code.clone().map(|code| code.bytes()); + + let existing = existing_override.state_diff.get_or_insert(Default::default()); + let changed_slots = + acc.storage.iter().map(|(&key, slot)| (B256::from(key), B256::from(slot.present_value))); + + existing.extend(changed_slots); + } + + evm.db_mut().commit(state); + + let (success, tx_gas_used, tx_logs) = split_execution_result(&result); + gas_used = gas_used.saturating_add(tx_gas_used); + + logs.extend(tx_logs.iter().map(|inner| Log { inner: inner.clone(), ..Default::default() })); + + let base_receipt = Receipt { status: success.into(), cumulative_gas_used: gas_used, logs: tx_logs }; + + let ty = transaction.ty(); + let op_receipt = wrap_op_receipt(ty, base_receipt, deposit_nonce, deposit_receipt_version)?; + + let meta = TransactionMeta { + tx_hash, + index: idx as u64, + block_hash: header.hash(), + block_number: block.number, + base_fee: block.base_fee_per_gas, + excess_blob_gas: block.excess_blob_gas, + timestamp: block.timestamp, + }; + + let input: ConvertReceiptInput<'_, OpPrimitives> = ConvertReceiptInput { + receipt: op_receipt, + tx: Recovered::new_unchecked(transaction, sender), + gas_used: tx_gas_used, + next_log_index, + meta, + }; + + let receipt = OpReceiptBuilder::new(chain_spec.as_ref(), input, &mut l1_block_info)?.build(); + + // TODO: Is this correct?q + next_log_index += receipt.inner.logs().len(); + ub.with_transaction_receipt(tx_hash, receipt.clone()); + receipts.push(receipt); + } + Err(e) => { + return Err(ExecError::Failed(format!( + "failed to execute transaction: {:?} tx_hash: {:?} sender: {:?}", + e, tx_hash, sender + ))); + } + } + } + + db = evm.into_db(); + ub = ub + .with_db_cache(db.cache) + .with_state_overrides(Some(state_overrides)) + .with_bundle_state(db.db.bundle_state); + + ub.accept_frag_execution(frag, logs, receipts, gas_used); + + self.current_unsealed_block.store(Some(Arc::new(ub))); + + Ok(()) + } + + fn seal(&mut self) -> Result<(), ExecError> { + let ub = self.current_unsealed_block.load_full().ok_or(ExecError::NotInitialized)?; + let withdrawals_hash = if ub.is_prague { + let canonical_block = ub.env.number.saturating_sub(1); + + let state_provider = + self.client.state_by_block_number_or_tag(BlockNumberOrTag::Number(canonical_block)).map_err(|e| { + ExecError::Failed(format!("state_by_block_number_or_tag({canonical_block}) failed: {e}")) + })?; + let bundle_state = ub.get_bundle_state(); + Some(isthmus::withdrawals_root(bundle_state, state_provider)?) + } else { + None + }; + + let block = ub.to_op_block(withdrawals_hash)?; + let sealed = SealedBlock::seal_slow(block); + let recovered = sealed.try_recover().map_err(|e| ExecError::Failed(format!("recover senders: {e}")))?; + + let provider_rw = self.client.database_provider_rw()?; + provider_rw.insert_block(recovered)?; + provider_rw.commit()?; + Ok(()) } - fn execute_frag( - &mut self, - _ub: &UnsealedBlock, - _frag: &FragV0, - ) -> impl Future> + Send + '_ { - async move { Ok(ExecOutput { receipts: vec![], logs: vec![], gas_used_delta: 0 }) } + fn set_canonical(&mut self, b: &OpBlock) -> Result<(), ExecError> { + let sealed = SealedHeader::seal_slow(b.header.clone()); + self.client.set_canonical_head(sealed); + Ok(()) } - fn seal(&mut self, _ub: &UnsealedBlock) -> impl Future> + Send + '_ { - async move { Ok(()) } + fn get_block(&self, hash: B256) -> Result { + if let Some(block) = self + .block_cache + .lock() + .map_err(|_| ExecError::Failed("block_cache mutex poisoned".into()))? + .get(&hash) + .cloned() + { + return Ok(block); + } + + // fetch + let block = self + .client + .block_by_hash(hash) + .map_err(|e| ExecError::Failed(format!("block_by_hash failed: {e}")))? + .ok_or_else(|| ExecError::Failed("pre-sealed block not found".into()))?; + + self.block_cache + .lock() + .map_err(|_| ExecError::Failed("block_cache mutex poisoned".into()))? + .insert(hash, block.clone()); + + Ok(block) } - fn set_canonical(&mut self, _b: &Block) -> impl Future> + Send + '_ { - async move { Ok(()) } + fn reset(&mut self) { + self.current_unsealed_block.store(None); } +} + +fn build_op_block_from_ub_and_frag(ub: &UnsealedBlock, frag: &FragV0) -> Result { + // Decode EIP-2718 tx bytes -> OpTransactionSigned + let tx_list: Vec = frag + .txs + .iter() + .map(|tx_bytes| { + OpTxEnvelope::decode_2718(&mut tx_bytes.as_ref()) + .map_err(|e| ExecError::Failed(format!("decode tx failed: {e}"))) + }) + .collect::, ExecError>>()?; + + let extra_data: Bytes = Bytes::copy_from_slice(ub.env.extra_data.as_ref()); + let header = Header { + parent_hash: ub.env.parent_hash, + ommers_hash: Default::default(), + beneficiary: ub.env.beneficiary, + state_root: B256::ZERO, + transactions_root: B256::ZERO, + receipts_root: B256::ZERO, + logs_bloom: Default::default(), + difficulty: ub.env.difficulty, + number: frag.block_number, + gas_limit: ub.env.gas_limit, + gas_used: ub.cumulative_gas_used, + timestamp: ub.env.timestamp, + extra_data, + mix_hash: ub.env.prevrandao, + nonce: Default::default(), + base_fee_per_gas: Some(ub.env.basefee), + withdrawals_root: None, + blob_gas_used: Some(ub.cumulative_blob_gas_used), + excess_blob_gas: Some(0), + parent_beacon_block_root: Some(ub.env.parent_beacon_block_root), + requests_hash: None, + }; - fn get_block( - &self, - _hash: B256, - _number: BlockNumber, - ) -> impl Future> + Send + '_ { - async move { Ok(Block::default()) } + let body = BlockBody { transactions: tx_list, ommers: vec![], withdrawals: None }; + + Ok(OpBlock::new(header, body)) +} + +fn split_execution_result(result: &ExecutionResult) -> (bool, u64, Vec) { + match result { + ExecutionResult::Success { gas_used, logs, .. } => (true, *gas_used, logs.clone()), + ExecutionResult::Revert { gas_used, .. } => (false, *gas_used, vec![]), + ExecutionResult::Halt { gas_used, .. } => (false, *gas_used, vec![]), } +} - fn reset(&mut self) {} +fn wrap_op_receipt( + tx_type: u8, + receipt: Receipt, + deposit_nonce: Option, + deposit_receipt_version: Option, +) -> Result { + Ok(match tx_type { + 0x00 => OpReceipt::Legacy(receipt), + 0x01 => OpReceipt::Eip2930(receipt), + 0x02 => OpReceipt::Eip1559(receipt), + 0x04 => OpReceipt::Eip7702(receipt), + t if t == op_alloy_consensus::DEPOSIT_TX_TYPE_ID => OpReceipt::Deposit(op_alloy_consensus::OpDepositReceipt { + inner: receipt, + deposit_nonce, + deposit_receipt_version, + }), + other => return Err(ExecError::Failed(format!("unsupported tx type for receipt: 0x{other:02x}"))), + }) } diff --git a/based/crates/reth/src/unsealed_block.rs b/based/crates/reth/src/unsealed_block.rs index 04d3b1a22..8e71f2287 100644 --- a/based/crates/reth/src/unsealed_block.rs +++ b/based/crates/reth/src/unsealed_block.rs @@ -1,14 +1,16 @@ -use alloy_consensus::{Header, TxEnvelope}; -use alloy_eips::eip2718::Decodable2718; +use alloy_consensus::{BlockBody, Header, TxEnvelope}; +use alloy_eips::{eip2718::Decodable2718, eip7685::EMPTY_REQUESTS_HASH}; use alloy_primitives::{Address, B256, Bytes, Sealable, TxHash, U256, map::foldhash::HashMap}; -use alloy_rpc_types::{BlockTransactions, Log, TransactionReceipt, state::StateOverride}; +use alloy_rpc_types::{BlockTransactions, Filter, Log, state::StateOverride}; use alloy_rpc_types_eth::Header as RPCHeader; use bop_common::p2p::{EnvV0, FragV0, Transaction as TxBytes}; -use op_alloy_consensus::OpReceiptEnvelope; +use op_alloy_consensus::{OpBlock, OpTxEnvelope}; use op_alloy_network::{Optimism, TransactionResponse}; -use op_alloy_rpc_types::Transaction; -use reth::revm::db::Cache; +use op_alloy_rpc_types::{OpTransactionReceipt, Transaction}; +use reth::revm::db::{BundleState, Cache}; +use reth_optimism_primitives::OpTransactionSigned; use reth_rpc_eth_api::RpcBlock; +use tokio::sync::broadcast; use crate::error::UnsealedBlockError; @@ -27,24 +29,31 @@ pub struct UnsealedBlock { pub hash: B256, /// Transaction receipts for executed transactions. - pub receipts: Vec, + pub receipts: Vec, /// Flattened logs emitted during execution. pub logs: Vec, /// Cumulative execution gas used across all transactions in the block. pub cumulative_gas_used: u64, /// Cumulative blob gas used across all blob-carrying transactions in the block. pub cumulative_blob_gas_used: u64, + pub is_prague: bool, transaction_count: HashMap, - transaction: Vec, - transaction_receipts: HashMap>>, + transactions: Vec, + transaction_receipts: HashMap, state_overrides: Option, + new_block_sender: broadcast::Sender>, + db_cache: Cache, + bundle_state: BundleState, } impl UnsealedBlock { - pub fn new(env: EnvV0) -> Self { + /// Create a fresh unsealed block state for `env` with empty frags/results/caches. + pub fn new(env: EnvV0, is_prague: bool) -> Self { + let (new_block_sender, _) = broadcast::channel(16); + Self { env, frags: Vec::new(), @@ -54,14 +63,27 @@ impl UnsealedBlock { logs: Vec::new(), cumulative_gas_used: 0, cumulative_blob_gas_used: 0, + is_prague, transaction_count: Default::default(), - transaction: vec![], + transactions: vec![], transaction_receipts: Default::default(), state_overrides: None, + new_block_sender, db_cache: Default::default(), + bundle_state: Default::default(), } } + /// Returns the canonical block number. + pub fn canonical_block_number(&self) -> u64 { + // TODO: Is this correct? + self.env.number.saturating_sub(1) + } + + pub fn subscribe_new_blocks(&self) -> broadcast::Receiver> { + self.new_block_sender.subscribe() + } + /// Returns `true` if no fragments have been added yet. pub fn is_empty(&self) -> bool { self.frags.is_empty() @@ -80,12 +102,12 @@ impl UnsealedBlock { } } - /// Raw tx bytes iterator (flattening frags) + /// Raw tx bytes iterator (flattening frags). pub fn transactions_iter_bytes(&self) -> impl Iterator + '_ { self.frags.iter().flat_map(|frag| frag.txs.iter()) } - /// Decoded txs iterator (lazy decode) + /// Decoded txs iterator (lazy decode). pub fn transactions_iter_decoded(&self) -> impl Iterator> + '_ { self.transactions_iter_bytes().enumerate().map(|(index, tx)| { // allocate a Vec to decode from @@ -94,17 +116,17 @@ impl UnsealedBlock { }) } - /// Decoded txs (allocates Vec), like Go `Transactions()` but decoded - pub fn transactions(&self) -> Result, UnsealedBlockError> { - self.transactions_iter_decoded().collect() + /// Return list of transaction + pub fn transactions(&self) -> Vec { + self.transactions.clone() } - /// Raw tx bytes (allocates Vec>), like Go `ByteTransactions()` + /// Raw tx bytes (allocates Vec>), like Go `ByteTransactions()`. pub fn byte_transactions(&self) -> Vec> { self.transactions_iter_bytes().map(|tx| tx.iter().copied().collect::>()).collect() } - // Return the last frag on the list. + /// Return the last fragment in the list (if any). pub fn last_frag(&self) -> Option<&FragV0> { self.frags.last() } @@ -112,10 +134,21 @@ impl UnsealedBlock { /// Apply the accepted frag into in-memory bookkeeping (NOT executing txs). /// /// Execution results (receipts/logs/gas) should be appended separately. - pub fn accept_frag(&mut self, f: FragV0) { + pub fn accept_frag_execution( + &mut self, + f: FragV0, + logs: Vec, + receipts: Vec, + cummulative_gas_used: u64, + ) { self.last_sequence_number = Some(f.seq); self.cumulative_blob_gas_used = self.cumulative_blob_gas_used.saturating_add(f.blob_gas_used); - self.frags.push(f); + self.frags.push(f.clone()); + self.logs.extend_from_slice(logs.as_slice()); + self.receipts.extend_from_slice(receipts.as_slice()); + self.cumulative_gas_used = cummulative_gas_used; + + let _ = self.new_block_sender.send(self.to_rpc_block(false)); } /// Validate frag against current state (equivalent to your ValidateNewFragV0 + sequencing gate). @@ -172,50 +205,97 @@ impl UnsealedBlock { /// Reset to a fresh env (drop frags/results/counters). pub fn reset_to_env(&mut self, env: EnvV0) { - *self = Self::new(env); + *self = Self::new(env, self.is_prague); + } + + /// Attach/replace the DB cache to carry execution overlay state forward. + pub fn with_db_cache(mut self, cache: Cache) -> Self { + self.db_cache = cache; + self + } + + /// Attach/replace the bundle state to carry execution overlay state forward. + pub fn with_bundle_state(mut self, bundle_state: BundleState) -> Self { + self.bundle_state = bundle_state; + self + } + + /// Attach/replace the state overrides that represent the current overlay diff. + pub fn with_state_overrides(mut self, state_overrides: Option) -> Self { + self.state_overrides = state_overrides; + self + } + + /// Returns the database cache. + pub fn get_db_cache(&self) -> Cache { + self.db_cache.clone() + } + + /// Returns the bundle state. + pub fn get_bundle_state(&self) -> &BundleState { + &self.bundle_state + } + + /// Clone this unsealed block into a mutable working copy for in-place updates. + pub fn clone_for_update(&self) -> Self { + Self { + env: self.env.clone(), + frags: self.frags.clone(), + last_sequence_number: self.last_sequence_number, + hash: self.hash, + receipts: self.receipts.clone(), + logs: self.logs.clone(), + cumulative_gas_used: self.cumulative_gas_used, + cumulative_blob_gas_used: self.cumulative_blob_gas_used, + is_prague: self.is_prague, + transaction_count: self.transaction_count.clone(), + transactions: self.transactions.clone(), + db_cache: self.db_cache.clone(), + state_overrides: self.state_overrides.clone(), + new_block_sender: self.new_block_sender.clone(), + transaction_receipts: self.transaction_receipts.clone(), + bundle_state: self.bundle_state.clone(), + } } /// Returns a cloned list of unsealed logs collected so far. - pub fn get_unsealed_logs(self) -> Vec { - self.logs.clone() + pub fn get_unsealed_logs(&self, filter: &Filter) -> Vec { + self.logs.clone().into_iter().filter(|log| filter.matches(&alloy_primitives::Log::from(log.clone()))).collect() } /// Returns a cloned list of fragments accepted into this unsealed block. - pub fn get_unsealed_frags(self) -> Vec { + pub fn get_unsealed_frags(&self) -> Vec { self.frags.clone() } /// Looks up and returns a cloned transaction receipt by transaction hash, if present. - pub fn get_transaction_receipt(self, tx_hash: B256) -> Option>> { - self.transaction_receipts.get(&tx_hash).cloned() + pub fn get_transaction_receipt(&self, tx_hash: &TxHash) -> Option { + self.transaction_receipts.get(tx_hash).cloned() + } + + /// Looks up and returns a cloned transaction by transaction hash, if present. + pub fn get_transaction(&self, tx_hash: &TxHash) -> Option { + self.transactions.iter().find(|tx| tx.tx_hash() == *tx_hash).cloned() } /// Returns a cloned copy of the current state overrides, if any are set. - pub fn get_state_overrides(self) -> Option { + pub fn get_state_overrides(&self) -> Option { self.state_overrides.clone() } /// Returns the locally tracked transaction count (nonce) for `address`, or zero if unknown. - pub fn get_transaction_count(self, address: Address) -> U256 { + pub fn get_transaction_count(&self, address: Address) -> U256 { self.transaction_count.get(&address).cloned().unwrap_or(U256::from(0)) } /// Returns the cached balance for `address` from the DB cache, if the account is present. - pub fn get_balance(self, address: Address) -> Option { - let Some(account) = self.db_cache.accounts.get(&address) else { - return None; - }; - - Some(account.info.balance) + pub fn get_balance(&self, address: Address) -> Option { + self.db_cache.accounts.get(&address).map(|account| account.info.balance) } - /// Convert current unsealed block into RpcBlock - pub fn to_block(&self, full: bool) -> RpcBlock { - let last_frag_number = match self.frags.last() { - Some(frag) => (frag.block_number), - None => 0, - }; - let header = Header { + /// Return a decoded header snapshot derived from the current env + local counters. + pub fn get_header(&self) -> Header { + Header { parent_hash: self.env.parent_hash, ommers_hash: Default::default(), beneficiary: self.env.beneficiary, @@ -224,7 +304,7 @@ impl UnsealedBlock { receipts_root: B256::ZERO, logs_bloom: Default::default(), difficulty: self.env.difficulty, - number: last_frag_number, + number: self.env.number, gas_limit: self.env.gas_limit, gas_used: self.cumulative_gas_used, timestamp: self.env.timestamp, @@ -237,9 +317,35 @@ impl UnsealedBlock { excess_blob_gas: Some(0), parent_beacon_block_root: Some(self.env.parent_beacon_block_root), requests_hash: None, - }; + } + } + + /// Append a fully materialized transaction to the RPC `transactions` list. + pub(crate) fn with_transaction(&mut self, transaction: Transaction) -> &Self { + self.transactions.push(transaction); + self + } + + /// Insert/replace the receipt for `tx_hash` in the per-tx receipt map. + pub(crate) fn with_transaction_receipt(&mut self, tx_hash: B256, receipt: OpTransactionReceipt) -> &Self { + self.transaction_receipts.insert(tx_hash, receipt); + self + } + + /// Increment the locally tracked nonce for `sender` after accepting a tx. + pub(crate) fn increment_nonce(&mut self, sender: Address) -> &Self { + let zero = U256::from(0); + let current_count = self.transaction_count.get(&sender).unwrap_or(&zero); + + _ = self.transaction_count.insert(sender, *current_count + U256::from(1)); + self + } + + /// Convert current unsealed block into RpcBlock. + pub fn to_rpc_block(&self, full: bool) -> RpcBlock { + let header = self.get_header(); let header = header.clone().seal_slow(); - let block_transactions = self.transaction.clone(); + let block_transactions = self.transactions.clone(); let transactions = if full { BlockTransactions::Full(block_transactions) @@ -255,4 +361,51 @@ impl UnsealedBlock { withdrawals: None, } } + + pub fn to_op_block(&self, withdrawals_hash: Option) -> Result { + // Decode EIP-2718 tx bytes -> OpTransactionSigned + let tx_list: Vec = self + .frags + .iter() + .enumerate() + .flat_map(|(frag_idx, frag)| { + frag.txs.iter().enumerate().map(move |(tx_idx, tx_bytes)| { + OpTxEnvelope::decode_2718(&mut tx_bytes.as_ref()).map_err(|e| { + UnsealedBlockError::Failed(format!("decode tx failed (frag={frag_idx} tx={tx_idx}): {e}")) + }) + }) + }) + .collect::, UnsealedBlockError>>()?; + + let requests_hash = self.is_prague.then_some(EMPTY_REQUESTS_HASH); + + let extra_data: Bytes = Bytes::copy_from_slice(self.env.extra_data.as_ref()); + let header = Header { + parent_hash: self.env.parent_hash, + ommers_hash: Default::default(), + beneficiary: self.env.beneficiary, + state_root: B256::ZERO, + transactions_root: B256::ZERO, + receipts_root: B256::ZERO, + logs_bloom: Default::default(), + difficulty: self.env.difficulty, + number: self.env.number, + gas_limit: self.env.gas_limit, + gas_used: self.cumulative_gas_used, + timestamp: self.env.timestamp, + extra_data, + mix_hash: self.env.prevrandao, + nonce: Default::default(), + base_fee_per_gas: Some(self.env.basefee), + withdrawals_root: withdrawals_hash, + blob_gas_used: Some(self.cumulative_blob_gas_used), + excess_blob_gas: Some(0), + parent_beacon_block_root: Some(self.env.parent_beacon_block_root), + requests_hash, + }; + + let body = BlockBody { transactions: tx_list, ommers: vec![], withdrawals: None }; + + Ok(reth_optimism_primitives::OpBlock::new(header, body)) + } }