diff --git a/based/Cargo.lock b/based/Cargo.lock index f57da8cea..e263ab6e1 100644 --- a/based/Cargo.lock +++ b/based/Cargo.lock @@ -1612,6 +1612,7 @@ dependencies = [ "reth-node-builder 1.9.3", "reth-optimism-chainspec 1.9.3", "reth-optimism-cli 1.9.3", + "reth-optimism-consensus 1.9.3", "reth-optimism-evm 1.9.3", "reth-optimism-node 1.9.3", "reth-optimism-primitives 1.9.3", diff --git a/based/crates/reth/Cargo.toml b/based/crates/reth/Cargo.toml index 553c1cebe..e594d714b 100644 --- a/based/crates/reth/Cargo.toml +++ b/based/crates/reth/Cargo.toml @@ -46,3 +46,4 @@ thiserror.workspace = true tokio.workspace = true tokio-stream.workspace = true tracing.workspace = true +reth_optimism_consensus = { git = "https://github.com/paradigmxyz/reth", tag = "v1.9.3", package = "reth-optimism-consensus" } diff --git a/based/crates/reth/src/api/eth.rs b/based/crates/reth/src/api/eth.rs index 2f43acf13..6e8835183 100644 --- a/based/crates/reth/src/api/eth.rs +++ b/based/crates/reth/src/api/eth.rs @@ -123,7 +123,7 @@ where if self.use_unsealed_state(&number) && let Some(unsealed_block) = self.unsealed_block.load_full() { - Ok(Some(unsealed_block.to_block(full))) + Ok(Some(unsealed_block.to_rpc_block(full))) } else { EthBlocks::rpc_block(&self.canonical, number.into(), full).await.map_err(Into::into) } diff --git a/based/crates/reth/src/driver.rs b/based/crates/reth/src/driver.rs index 7f864712f..fcf4768a4 100644 --- a/based/crates/reth/src/driver.rs +++ b/based/crates/reth/src/driver.rs @@ -2,7 +2,6 @@ use std::{sync::Arc, time::Instant}; use alloy_consensus::Header; use alloy_primitives::B256; -use alloy_rpc_types::Block; use arc_swap::ArcSwapOption; use bop_common::{ p2p::{EnvV0, FragV0, SealV0}, @@ -10,7 +9,9 @@ use bop_common::{ }; use reth_chainspec::{ChainSpecProvider, EthChainSpec}; use reth_optimism_chainspec::OpHardforks; -use reth_storage_api::{BlockReaderIdExt, StateProviderFactory}; +use reth_storage_api::{ + BlockReaderIdExt, BlockWriter, CanonChainTracker, DatabaseProviderFactory, StateProviderFactory, +}; use tokio::sync::{mpsc, oneshot}; use tracing::{error, info}; @@ -23,6 +24,7 @@ use crate::{ /// Result of submitting a frag to the driver. #[derive(Debug, Clone, Copy)] pub enum FragStatus { + Ignored, Valid, Invalid, } @@ -99,9 +101,12 @@ impl Driver { where Client: StateProviderFactory + ChainSpecProvider + OpHardforks> - + BlockReaderIdExt
+ + BlockReaderIdExt
+ + CanonChainTracker
+ + DatabaseProviderFactory + Clone + 'static, + ::ProviderRW: BlockWriter, { let executor = StateExecutor::new(client); let current_unsealed_block = executor.shared_unsealed_block(); @@ -244,7 +249,7 @@ impl DriverInner { if frag.block_number < ub.env.number { info!(frag_block = frag.block_number, env_number = ub.env.number, "stale frag (older block), ignoring"); - return Ok(FragStatus::Valid); + return Ok(FragStatus::Ignored); } if let Err(e) = ub.validate_new_frag(&frag) { @@ -266,7 +271,7 @@ impl DriverInner { if ub.last_frag().is_some_and(|f| f.is_last) { info!("last frag received, pre-sealing block"); - if let Err(e) = self.exec.seal().await { + if let Err(e) = self.exec.seal() { error!(error = %e, "seal failed, discarding unsealed block"); self.reset_current_unsealed_block(); return Err(DriverError::from(e)); @@ -287,9 +292,7 @@ impl DriverInner { return Ok(()); } - let presealed_block = self.exec.get_block(seal.block_hash, seal.block_number).await; - - let presealed_block = match presealed_block { + let presealed_block = match self.exec.get_block(seal.block_hash) { Ok(b) => b, Err(e) => { self.reset_current_unsealed_block(); @@ -299,7 +302,7 @@ impl DriverInner { self.validate_seal_frag_v0(&presealed_block, ub.as_ref(), &seal)?; - self.exec.set_canonical(&presealed_block).await?; + self.exec.set_canonical(&presealed_block)?; self.reset_current_unsealed_block(); @@ -309,11 +312,11 @@ impl DriverInner { fn validate_seal_frag_v0( &self, - presealed_block: &Block, + presealed_block: &OpBlock, ub: &UnsealedBlock, seal: &SealV0, ) -> Result<(), ValidateSealError> { - let expected_block_hash: B256 = presealed_block.header.hash.into(); + let expected_block_hash: B256 = presealed_block.header.hash_slow(); if expected_block_hash != seal.block_hash { return Err(ValidateSealError::BlockHash { expected: expected_block_hash, got: seal.block_hash }); } diff --git a/based/crates/reth/src/error.rs b/based/crates/reth/src/error.rs index eb77400d8..f96808ff9 100644 --- a/based/crates/reth/src/error.rs +++ b/based/crates/reth/src/error.rs @@ -86,6 +86,9 @@ pub enum UnsealedBlockError { #[error("received frag after last frag already accepted")] AlreadyEnded, + + #[error("operation failed: {0}")] + Failed(String), } #[derive(Debug, Error)] @@ -113,4 +116,7 @@ pub enum ExecError { #[error(transparent)] OpEthApi(#[from] OpEthApiError), + + #[error(transparent)] + UnsealedBlock(#[from] UnsealedBlockError), } diff --git a/based/crates/reth/src/exec.rs b/based/crates/reth/src/exec.rs index 3312c60a4..ff71cfc2c 100644 --- a/based/crates/reth/src/exec.rs +++ b/based/crates/reth/src/exec.rs @@ -1,12 +1,12 @@ -use std::{future::Future, sync::Arc}; +use std::sync::{Arc, Mutex}; use alloy_consensus::{ BlockBody, Header, Receipt, Transaction, transaction::{Recovered, SignerRecoverable, TransactionMeta}, }; use alloy_eips::{BlockNumberOrTag, Typed2718, eip2718::Decodable2718}; -use alloy_primitives::{B256, BlockNumber, Bytes, Sealable}; -use alloy_rpc_types::{Block, Log}; +use alloy_primitives::{B256, Bytes, Sealable}; +use alloy_rpc_types::Log; use arc_swap::ArcSwapOption; use bop_common::{ p2p::{EnvV0, FragV0}, @@ -14,10 +14,15 @@ use bop_common::{ }; use op_alloy_consensus::OpTxEnvelope; use op_alloy_rpc_types::{OpTransactionReceipt, Transaction as RPCTransaction}; -use reth::api::Block as RethBlock; -use reth_chainspec::{ChainSpecProvider, EthChainSpec}; +use reth::{ + api::Block as RethBlock, + network::cache::LruMap, + primitives::{SealedBlock, SealedHeader}, +}; +use reth_chainspec::{ChainSpecProvider, EthChainSpec, EthereumHardforks as _}; use reth_evm::{ConfigureEvm, Evm, op_revm::OpHaltReason}; use reth_optimism_chainspec::OpHardforks; +use reth_optimism_consensus::isthmus; use reth_optimism_evm::{OpEvmConfig, OpNextBlockEnvAttributes}; use reth_optimism_primitives::{OpBlock, OpPrimitives, OpReceipt, OpTransactionSigned}; use reth_optimism_rpc::OpReceiptBuilder; @@ -27,11 +32,15 @@ use reth_revm::{ database::StateProviderDatabase, }; use reth_rpc_convert::transaction::ConvertReceiptInput; -use reth_storage_api::{BlockReaderIdExt, StateProviderFactory}; +use reth_storage_api::{ + BlockReaderIdExt, BlockWriter, CanonChainTracker, DBProvider, DatabaseProviderFactory, StateProviderFactory, +}; use revm::database::CacheDB; use crate::{error::ExecError, unsealed_block::UnsealedBlock}; +const BLOCK_CACHE_LIMIT: u32 = 256; + /// This trait is the ONLY place that needs to know about Reth internals. /// Everything else is just state-machine + bookkeeping. pub trait UnsealedExecutor: Send { @@ -43,11 +52,11 @@ pub trait UnsealedExecutor: Send { /// MUST be cumulative: txs execute after all previous frags's txs. fn execute_frag(&mut self, frag: &FragV0) -> Result<(), ExecError>; - fn seal(&mut self) -> impl Future> + Send + '_; + fn seal(&mut self) -> Result<(), ExecError>; - fn set_canonical(&mut self, b: &Block) -> impl Future> + Send + '_; + fn set_canonical(&mut self, b: &OpBlock) -> Result<(), ExecError>; - fn get_block(&self, hash: B256, number: BlockNumber) -> impl Future> + Send + '_; + fn get_block(&self, hash: B256) -> Result; /// Reset overlay state completely. fn reset(&mut self); @@ -56,11 +65,16 @@ pub trait UnsealedExecutor: Send { pub struct StateExecutor { client: Client, current_unsealed_block: Arc>, + block_cache: Mutex>, } impl StateExecutor { pub fn new(client: Client) -> Self { - Self { client, current_unsealed_block: Arc::new(ArcSwapOption::new(None)) } + Self { + client, + current_unsealed_block: Arc::new(ArcSwapOption::new(None)), + block_cache: Mutex::new(LruMap::new(BLOCK_CACHE_LIMIT)), + } } pub fn shared_unsealed_block(&self) -> Arc> { @@ -72,9 +86,12 @@ impl UnsealedExecutor for StateExecutor where Client: StateProviderFactory + ChainSpecProvider + OpHardforks> - + BlockReaderIdExt
+ + BlockReaderIdExt
+ + CanonChainTracker
+ + DatabaseProviderFactory + Clone + 'static, + ::ProviderRW: BlockWriter, { fn ensure_env(&mut self, env: &EnvV0) -> Result<(), ExecError> { let Some(parent) = self.client.block_by_hash(env.parent_hash)? else { @@ -103,7 +120,11 @@ where self.client.state_by_block_number_or_tag(BlockNumberOrTag::Number(parent_header.number))?; let state_provider_db = StateProviderDatabase::new(state_provider); let state = State::builder().with_database(state_provider_db).with_bundle_update().build(); - let ub = UnsealedBlock::new(env.clone()).with_db_cache(CacheDB::new(state).cache); + + // Check if the current block is a prague block + let is_prague = self.client.chain_spec().is_prague_active_at_timestamp(env.timestamp); + + let ub = UnsealedBlock::new(env.clone(), is_prague).with_db_cache(CacheDB::new(state).cache); self.current_unsealed_block.store(Some(Arc::new(ub))); Ok(()) @@ -273,7 +294,10 @@ where } db = evm.into_db(); - ub = ub.with_db_cache(db.cache).with_state_overrides(Some(state_overrides)); + ub = ub + .with_db_cache(db.cache) + .with_state_overrides(Some(state_overrides)) + .with_bundle_state(db.db.bundle_state); ub.accept_frag_execution(frag, logs, receipts, gas_used); @@ -282,20 +306,61 @@ where Ok(()) } - fn seal(&mut self) -> impl Future> + Send + '_ { - async move { Ok(()) } + fn seal(&mut self) -> Result<(), ExecError> { + let ub = self.current_unsealed_block.load_full().ok_or(ExecError::NotInitialized)?; + let withdrawals_hash = if ub.is_prague { + let canonical_block = ub.env.number.saturating_sub(1); + + let state_provider = + self.client.state_by_block_number_or_tag(BlockNumberOrTag::Number(canonical_block)).map_err(|e| { + ExecError::Failed(format!("state_by_block_number_or_tag({canonical_block}) failed: {e}")) + })?; + let bundle_state = ub.get_bundle_state(); + Some(isthmus::withdrawals_root(bundle_state, state_provider)?) + } else { + None + }; + + let block = ub.to_op_block(withdrawals_hash)?; + let sealed = SealedBlock::seal_slow(block); + let recovered = sealed.try_recover().map_err(|e| ExecError::Failed(format!("recover senders: {e}")))?; + + let provider_rw = self.client.database_provider_rw()?; + provider_rw.insert_block(recovered)?; + provider_rw.commit()?; + Ok(()) } - fn set_canonical(&mut self, _b: &Block) -> impl Future> + Send + '_ { - async move { Ok(()) } + fn set_canonical(&mut self, b: &OpBlock) -> Result<(), ExecError> { + let sealed = SealedHeader::seal_slow(b.header.clone()); + self.client.set_canonical_head(sealed); + Ok(()) } - fn get_block( - &self, - _hash: B256, - _number: BlockNumber, - ) -> impl Future> + Send + '_ { - async move { Ok(Block::default()) } + fn get_block(&self, hash: B256) -> Result { + if let Some(block) = self + .block_cache + .lock() + .map_err(|_| ExecError::Failed("block_cache mutex poisoned".into()))? + .get(&hash) + .cloned() + { + return Ok(block); + } + + // fetch + let block = self + .client + .block_by_hash(hash) + .map_err(|e| ExecError::Failed(format!("block_by_hash failed: {e}")))? + .ok_or_else(|| ExecError::Failed("pre-sealed block not found".into()))?; + + self.block_cache + .lock() + .map_err(|_| ExecError::Failed("block_cache mutex poisoned".into()))? + .insert(hash, block.clone()); + + Ok(block) } fn reset(&mut self) { diff --git a/based/crates/reth/src/unsealed_block.rs b/based/crates/reth/src/unsealed_block.rs index 77b922b68..8e71f2287 100644 --- a/based/crates/reth/src/unsealed_block.rs +++ b/based/crates/reth/src/unsealed_block.rs @@ -1,12 +1,14 @@ -use alloy_consensus::{Header, TxEnvelope}; -use alloy_eips::eip2718::Decodable2718; +use alloy_consensus::{BlockBody, Header, TxEnvelope}; +use alloy_eips::{eip2718::Decodable2718, eip7685::EMPTY_REQUESTS_HASH}; use alloy_primitives::{Address, B256, Bytes, Sealable, TxHash, U256, map::foldhash::HashMap}; use alloy_rpc_types::{BlockTransactions, Filter, Log, state::StateOverride}; use alloy_rpc_types_eth::Header as RPCHeader; use bop_common::p2p::{EnvV0, FragV0, Transaction as TxBytes}; +use op_alloy_consensus::{OpBlock, OpTxEnvelope}; use op_alloy_network::{Optimism, TransactionResponse}; use op_alloy_rpc_types::{OpTransactionReceipt, Transaction}; -use reth::revm::db::Cache; +use reth::revm::db::{BundleState, Cache}; +use reth_optimism_primitives::OpTransactionSigned; use reth_rpc_eth_api::RpcBlock; use tokio::sync::broadcast; @@ -34,6 +36,7 @@ pub struct UnsealedBlock { pub cumulative_gas_used: u64, /// Cumulative blob gas used across all blob-carrying transactions in the block. pub cumulative_blob_gas_used: u64, + pub is_prague: bool, transaction_count: HashMap, transactions: Vec, @@ -43,11 +46,12 @@ pub struct UnsealedBlock { new_block_sender: broadcast::Sender>, db_cache: Cache, + bundle_state: BundleState, } impl UnsealedBlock { /// Create a fresh unsealed block state for `env` with empty frags/results/caches. - pub fn new(env: EnvV0) -> Self { + pub fn new(env: EnvV0, is_prague: bool) -> Self { let (new_block_sender, _) = broadcast::channel(16); Self { @@ -59,12 +63,14 @@ impl UnsealedBlock { logs: Vec::new(), cumulative_gas_used: 0, cumulative_blob_gas_used: 0, + is_prague, transaction_count: Default::default(), transactions: vec![], transaction_receipts: Default::default(), state_overrides: None, new_block_sender, db_cache: Default::default(), + bundle_state: Default::default(), } } @@ -110,9 +116,9 @@ impl UnsealedBlock { }) } - /// Decoded txs (allocates Vec), like Go `Transactions()` but decoded. - pub fn transactions(&self) -> Result, UnsealedBlockError> { - self.transactions_iter_decoded().collect() + /// Return list of transaction + pub fn transactions(&self) -> Vec { + self.transactions.clone() } /// Raw tx bytes (allocates Vec>), like Go `ByteTransactions()`. @@ -142,8 +148,7 @@ impl UnsealedBlock { self.receipts.extend_from_slice(receipts.as_slice()); self.cumulative_gas_used = cummulative_gas_used; - // TODO: Is this correct? Is everything applied here? - let _ = self.new_block_sender.send(self.to_block(false)); + let _ = self.new_block_sender.send(self.to_rpc_block(false)); } /// Validate frag against current state (equivalent to your ValidateNewFragV0 + sequencing gate). @@ -200,7 +205,7 @@ impl UnsealedBlock { /// Reset to a fresh env (drop frags/results/counters). pub fn reset_to_env(&mut self, env: EnvV0) { - *self = Self::new(env); + *self = Self::new(env, self.is_prague); } /// Attach/replace the DB cache to carry execution overlay state forward. @@ -209,6 +214,12 @@ impl UnsealedBlock { self } + /// Attach/replace the bundle state to carry execution overlay state forward. + pub fn with_bundle_state(mut self, bundle_state: BundleState) -> Self { + self.bundle_state = bundle_state; + self + } + /// Attach/replace the state overrides that represent the current overlay diff. pub fn with_state_overrides(mut self, state_overrides: Option) -> Self { self.state_overrides = state_overrides; @@ -220,6 +231,11 @@ impl UnsealedBlock { self.db_cache.clone() } + /// Returns the bundle state. + pub fn get_bundle_state(&self) -> &BundleState { + &self.bundle_state + } + /// Clone this unsealed block into a mutable working copy for in-place updates. pub fn clone_for_update(&self) -> Self { Self { @@ -231,12 +247,14 @@ impl UnsealedBlock { logs: self.logs.clone(), cumulative_gas_used: self.cumulative_gas_used, cumulative_blob_gas_used: self.cumulative_blob_gas_used, - transaction_count: Default::default(), - transactions: vec![], + is_prague: self.is_prague, + transaction_count: self.transaction_count.clone(), + transactions: self.transactions.clone(), db_cache: self.db_cache.clone(), state_overrides: self.state_overrides.clone(), new_block_sender: self.new_block_sender.clone(), - transaction_receipts: Default::default(), + transaction_receipts: self.transaction_receipts.clone(), + bundle_state: self.bundle_state.clone(), } } @@ -324,7 +342,7 @@ impl UnsealedBlock { } /// Convert current unsealed block into RpcBlock. - pub fn to_block(&self, full: bool) -> RpcBlock { + pub fn to_rpc_block(&self, full: bool) -> RpcBlock { let header = self.get_header(); let header = header.clone().seal_slow(); let block_transactions = self.transactions.clone(); @@ -343,4 +361,51 @@ impl UnsealedBlock { withdrawals: None, } } + + pub fn to_op_block(&self, withdrawals_hash: Option) -> Result { + // Decode EIP-2718 tx bytes -> OpTransactionSigned + let tx_list: Vec = self + .frags + .iter() + .enumerate() + .flat_map(|(frag_idx, frag)| { + frag.txs.iter().enumerate().map(move |(tx_idx, tx_bytes)| { + OpTxEnvelope::decode_2718(&mut tx_bytes.as_ref()).map_err(|e| { + UnsealedBlockError::Failed(format!("decode tx failed (frag={frag_idx} tx={tx_idx}): {e}")) + }) + }) + }) + .collect::, UnsealedBlockError>>()?; + + let requests_hash = self.is_prague.then_some(EMPTY_REQUESTS_HASH); + + let extra_data: Bytes = Bytes::copy_from_slice(self.env.extra_data.as_ref()); + let header = Header { + parent_hash: self.env.parent_hash, + ommers_hash: Default::default(), + beneficiary: self.env.beneficiary, + state_root: B256::ZERO, + transactions_root: B256::ZERO, + receipts_root: B256::ZERO, + logs_bloom: Default::default(), + difficulty: self.env.difficulty, + number: self.env.number, + gas_limit: self.env.gas_limit, + gas_used: self.cumulative_gas_used, + timestamp: self.env.timestamp, + extra_data, + mix_hash: self.env.prevrandao, + nonce: Default::default(), + base_fee_per_gas: Some(self.env.basefee), + withdrawals_root: withdrawals_hash, + blob_gas_used: Some(self.cumulative_blob_gas_used), + excess_blob_gas: Some(0), + parent_beacon_block_root: Some(self.env.parent_beacon_block_root), + requests_hash, + }; + + let body = BlockBody { transactions: tx_list, ommers: vec![], withdrawals: None }; + + Ok(reth_optimism_primitives::OpBlock::new(header, body)) + } }