diff --git a/Cargo.toml b/Cargo.toml index 2422098..3e86322 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -29,11 +29,7 @@ zenith-types = "0.15" alloy = { version = "=0.11.1", features = ["full", "json-rpc", "signer-aws", "rpc-types-mev", "rlp", "node-bindings"] } trevm = { version = "0.19.12", features = [ "concurrent-db" ]} -revm = { version = "19.6.0", features = [ "alloydb" ]} - -# HACK: Update these to use main alloy package -alloy-provider = { version = "0.7.3" } -alloy-eips = { version = "0.7.3" } +revm = { git="https://github.com/bluealloy/revm.git", tag="v59", features = [ "alloydb" ]} aws-config = "1.1.7" aws-sdk-kms = "1.15.0" diff --git a/bin/builder.rs b/bin/builder.rs index 193a3b9..7a58a4a 100644 --- a/bin/builder.rs +++ b/bin/builder.rs @@ -3,11 +3,16 @@ use builder::config::BuilderConfig; use builder::service::serve_builder_with_span; use builder::tasks::block::BlockBuilder; +use builder::tasks::bundler::BundlePoller; use builder::tasks::metrics::MetricsTask; use builder::tasks::oauth::Authenticator; +use builder::tasks::simulator::eval_fn; use builder::tasks::submit::SubmitTask; +use builder::tasks::tx_poller::{self, TxPoller}; +use revm::primitives::U256; use tokio::select; +use trevm::revm::primitives::ResultAndState; #[tokio::main] async fn main() -> eyre::Result<()> { @@ -16,17 +21,22 @@ async fn main() -> eyre::Result<()> { let span = tracing::info_span!("zenith-builder"); let config = BuilderConfig::load_from_env()?.clone(); + let host_provider = config.connect_host_provider().await?; let ru_provider = config.connect_ru_provider().await?; - let authenticator = Authenticator::new(&config); - - tracing::debug!(rpc_url = config.host_rpc_url.as_ref(), "instantiated provider"); + tracing::info!(rpc_url = config.host_rpc_url.as_ref(), "instantiated provider"); let sequencer_signer = config.connect_sequencer_signer().await?; let zenith = config.connect_zenith(host_provider.clone()); + tracing::info!("instantiated zenith"); let metrics = MetricsTask { host_provider: host_provider.clone() }; let (tx_channel, metrics_jh) = metrics.spawn(); + tracing::info!("instantiated zenith"); + + let authenticator = Authenticator::new(&config); + let authenticator_jh = authenticator.spawn(); + tracing::info!("instantiated authenticator"); let builder = BlockBuilder::new(&config, authenticator.clone(), ru_provider.clone()); let submit = SubmitTask { @@ -38,10 +48,15 @@ async fn main() -> eyre::Result<()> { config: config.clone(), outbound_tx_channel: tx_channel, }; + + let tx_poller = TxPoller::new_with_poll_interval_ms(&config, 1000); + let (tx_channel, tx_jh) = tx_poller.spawn(); + + let bundle_poller = BundlePoller::new_with_poll_interval_ms(&config, authenticator, 1); + let bundle_channel = bundle_poller.spawn(); - let authenticator_jh = authenticator.spawn(); let (submit_channel, submit_jh) = submit.spawn(); - let build_jh = builder.spawn(submit_channel); + let build_jh = builder.spawn(tx_channel, bundle_channel, submit_channel); let port = config.builder_port; let server = serve_builder_with_span(([0, 0, 0, 0], port), span); @@ -62,6 +77,9 @@ async fn main() -> eyre::Result<()> { _ = authenticator_jh => { tracing::info!("authenticator finished"); } + _ = tx_jh => { + tracing::info!("tx-poller finished"); + } } tracing::info!("shutting down"); diff --git a/src/lib.rs b/src/lib.rs index 02632c5..3ab4b90 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -27,7 +27,5 @@ pub mod tasks; /// Utilities. pub mod utils; -use alloy_eips as _; -use alloy_provider as _; /// Anonymous crate dependency imports. use openssl as _; diff --git a/src/tasks/block.rs b/src/tasks/block.rs index a6e1b6e..c48b02f 100644 --- a/src/tasks/block.rs +++ b/src/tasks/block.rs @@ -1,17 +1,23 @@ -use super::bundler::{Bundle, BundlePoller}; -use super::oauth::Authenticator; -use super::tx_poller::TxPoller; use crate::config::{BuilderConfig, WalletlessProvider}; +use crate::tasks::bundler::{Bundle, BundlePoller}; +use crate::tasks::oauth::Authenticator; +use crate::tasks::simulator::{eval_fn, SimulatorFactory}; +use crate::tasks::tx_poller::TxPoller; use alloy::{ consensus::{SidecarBuilder, SidecarCoder, TxEnvelope}, eips::eip2718::Decodable2718, + network::Ethereum, primitives::{keccak256, Bytes, B256}, - providers::Provider as _, + providers::Provider, rlp::Buf, }; -use std::time::{SystemTime, UNIX_EPOCH}; -use std::{sync::OnceLock, time::Duration}; -use tokio::{sync::mpsc, task::JoinHandle}; +use revm::database::AlloyDB; +use std::{ + sync::{Arc, OnceLock}, + time::Duration, + time::{SystemTime, UNIX_EPOCH}, +}; +use tokio::{sync::mpsc, task::JoinHandle, time::Instant}; use tracing::{debug, error, info, trace, Instrument}; use zenith_types::{encode_txns, Alloy2718Coder, ZenithEthBundle}; @@ -168,26 +174,7 @@ impl BlockBuilder { } } - /// Fetches bundles from the cache and ingests them into the in progress block - async fn get_bundles(&mut self, in_progress: &mut InProgressBlock) { - trace!("query bundles from cache"); - let bundles = self.bundle_poller.check_bundle_cache().await; - match bundles { - Ok(bundles) => { - for bundle in bundles { - match self.simulate_bundle(&bundle.bundle).await { - Ok(()) => in_progress.ingest_bundle(bundle.clone()), - Err(e) => error!(error = %e, id = ?bundle.id, "bundle simulation failed"), - } - } - } - Err(e) => { - error!(error = %e, "error polling bundles"); - } - } - } - - /// Simulates a Zenith bundle against the rollup state + /// Simulates a Zenith bundle against the rollup state. async fn simulate_bundle(&mut self, bundle: &ZenithEthBundle) -> eyre::Result<()> { // TODO: Simulate bundles with the Simulation Engine // [ENG-672](https://linear.app/initiates/issue/ENG-672/add-support-for-bundles) @@ -230,32 +217,51 @@ impl BlockBuilder { /// Spawn the block builder task, returning the inbound channel to it, and /// a handle to the running task. - pub fn spawn(mut self, outbound: mpsc::UnboundedSender<InProgressBlock>) -> JoinHandle<()> { + pub fn spawn<E>( + mut self, + inbound_tx: mpsc::UnboundedReceiver<TxEnvelope>, + inbound_bundle: mpsc::UnboundedReceiver<()>, + outbound: mpsc::UnboundedSender<InProgressBlock>, + ) -> JoinHandle<()> { tokio::spawn( async move { loop { - // sleep the buffer time + // Sleep the buffer time during block wake up tokio::time::sleep(Duration::from_secs(self.secs_to_next_target())).await; info!("beginning block build cycle"); - // Build a block - let mut in_progress = InProgressBlock::default(); - self.get_transactions(&mut in_progress).await; - self.get_bundles(&mut in_progress).await; - - // Filter confirmed transactions from the block - self.filter_transactions(&mut in_progress).await; - - // submit the block if it has transactions - if !in_progress.is_empty() { - debug!(txns = in_progress.len(), "sending block to submit task"); - let in_progress_block = std::mem::take(&mut in_progress); - if outbound.send(in_progress_block).is_err() { - error!("downstream task gone"); - break; + // Setup a simulator factory + let ru_provider = self.ru_provider.clone(); + let latest = ru_provider.get_block_number().await.unwrap(); + let db: AlloyDB<Ethereum, WalletlessProvider> = AlloyDB::new( + ru_provider.into(), + alloy::eips::BlockId::Number(latest.into()), + ); + let sim = SimulatorFactory::new(db, ()); + + // Calculate the deadline + let time_to_next_slot = self.secs_to_next_slot(); + let now = Instant::now(); + let deadline = now.checked_add(Duration::from_secs(time_to_next_slot)).unwrap(); + + // Run the simulation until the deadline + let sim_result = + sim.spawn(inbound_tx, inbound_bundle, Arc::new(eval_fn), deadline).await; + + // Handle simulation results + if let Ok(in_progress) = sim_result { + if !in_progress.is_empty() { + debug!(txns = in_progress.len(), "sending block to submit task"); + let in_progress_block = std::mem::take(&mut in_progress); + + // Send the received block and error if there's an issue + if outbound.send(in_progress_block).is_err() { + error!("downstream task gone"); + break; + } + } else { + debug!("no transactions, skipping block submission"); } - } else { - debug!("no transactions, skipping block submission"); } } } diff --git a/src/tasks/simulator.rs b/src/tasks/simulator.rs index 2a4ce22..1331b06 100644 --- a/src/tasks/simulator.rs +++ b/src/tasks/simulator.rs @@ -2,14 +2,15 @@ use crate::tasks::block::InProgressBlock; use alloy::consensus::TxEnvelope; use alloy::primitives::U256; use eyre::Result; -use revm::{db::CacheDB, primitives::CfgEnv, DatabaseRef}; +use revm::primitives::address; use std::{convert::Infallible, sync::Arc}; -use tokio::{select, sync::mpsc::UnboundedReceiver, task::JoinSet}; +use tokio::{select, sync::mpsc::Receiver, task::JoinSet}; +use tracing::debug; use trevm::{ db::sync::{ConcurrentState, ConcurrentStateInfo}, revm::{ - primitives::{EVMError, ResultAndState}, - Database, DatabaseCommit, EvmBuilder, + primitives::{Account, CfgEnv, EVMError, ExecutionResult, ResultAndState}, + Database, DatabaseCommit, DatabaseRef, EvmBuilder, }, BlockDriver, Cfg, DbConnect, EvmFactory, NoopBlock, TrevmBuilder, Tx, }; @@ -56,8 +57,8 @@ where /// * This function always returns whatever the latest finished in progress block is. pub fn spawn<T, F>( self, - mut inbound_tx: UnboundedReceiver<Arc<TxEnvelope>>, - _inbound_bundle: UnboundedReceiver<Arc<Vec<TxEnvelope>>>, + mut inbound_tx: Receiver<TxEnvelope>, + _inbound_bundle: Receiver<()>, evaluator: Arc<F>, deadline: tokio::time::Instant, ) -> tokio::task::JoinHandle<InProgressBlock> @@ -84,12 +85,12 @@ where if let Some(inbound_tx) = tx { // Setup the simulation environment let sim = self.clone(); - let eval = evaluator.clone(); let mut parent_db = Arc::new(sim.connect().unwrap()); + let eval_fn = evaluator.clone(); // Kick off the work in a new thread join_set.spawn(async move { - let result = sim.simulate_tx(inbound_tx, eval, parent_db.child()); + let result = sim.simulate_tx(inbound_tx, eval_fn, parent_db.child()); if let Some((best, db)) = result { if let Ok(()) = parent_db.merge_child(db) { @@ -134,7 +135,7 @@ where /// Simulates an inbound tx and applies its state if it's successfully simualted pub fn simulate_tx<F>( self, - tx: Arc<TxEnvelope>, + tx: TxEnvelope, evaluator: Arc<F>, db: ConcurrentState<Arc<ConcurrentState<Db>>>, ) -> SimResult<Db> @@ -147,7 +148,7 @@ where let result = trevm_instance .fill_cfg(&PecorinoCfg) .fill_block(&NoopBlock) - .fill_tx(tx.as_ref()) // Use as_ref() to get &SimTxEnvelope from Arc + .fill_tx(&tx) // Use as_ref() to get &SimTxEnvelope from Arc .run(); match result { @@ -164,7 +165,7 @@ where let db = t.1.into_db(); // return the updated db with the candidate applied to its state - Some((Best { tx, result, score }, db)) + Some((Best { tx: tx.into(), result, score }, db)) } Err(e) => { // if this transaction fails to run, log the error and return None @@ -173,20 +174,24 @@ where } } } +} - /// Simulates an inbound bundle and applies its state if it's successfully simulated - pub fn simulate_bundle<T, F>( - &self, - _bundle: Arc<Vec<T>>, - _evaluator: Arc<F>, - _trevm_instance: trevm::EvmNeedsCfg<'_, (), ConcurrentState<CacheDB<Arc<Db>>>>, - ) -> Option<Best<Vec<T>>> - where - T: Tx + Send + Sync + 'static, - F: Fn(&ResultAndState) -> U256 + Send + Sync + 'static, - { - todo!("implement bundle handling") +/// Simple evaluation function for builder scoring. +pub fn eval_fn(state: &ResultAndState) -> U256 { + // log the transaction results + match &state.result { + ExecutionResult::Success { .. } => debug!("execution successful"), + ExecutionResult::Revert { .. } => debug!("execution reverted"), + ExecutionResult::Halt { .. } => debug!("execution halted"), } + + // return the target account balance + let target_addr = address!("0x0000000000000000000000000000000000000000"); + let default_account = Account::default(); + let target_account = state.state.get(&target_addr).unwrap_or(&default_account); + tracing::info!(balance = ?target_account.info.balance, "target account balance"); + + target_account.info.balance } /// Wraps a Db into an EvmFactory compatible [`Database`] diff --git a/tests/simulator_test.rs b/tests/simulator_test.rs index ae12fa7..0823c34 100644 --- a/tests/simulator_test.rs +++ b/tests/simulator_test.rs @@ -1,25 +1,26 @@ -use alloy::consensus::{SignableTransaction as _, TxEip1559, TxEnvelope}; -use alloy::primitives::U256; -use alloy::signers::local::PrivateKeySigner; -use alloy::signers::SignerSync as _; +use alloy::{ + consensus::{TxEnvelope, TxEip1559, SignableTransaction}, + eips::BlockId, + primitives::U256, + providers::{Provider, ProviderBuilder}, + signers::local::PrivateKeySigner, + signers::SignerSync as _, +}; use builder::tasks::simulator::SimulatorFactory; -use revm::db::{AlloyDB, CacheDB}; -use revm::primitives::{address, TxKind}; +use revm::{ + database::{AlloyDB, CacheDB}, + primitives::{address, TxKind}, +}; use std::sync::Arc; use tokio::sync::mpsc; use tokio::time::{Duration, Instant}; use trevm::revm::primitives::{Account, ExecutionResult, ResultAndState}; -// HACK: These have to be pinned to 0.7.3 because of revm version issues. -// Once revm is updated, use the main alloy package again. -use alloy_eips::BlockId; -use alloy_provider::{Provider, ProviderBuilder}; - #[tokio::test(flavor = "multi_thread")] async fn test_spawn() { // Setup transaction pipeline plumbing - let (tx_sender, tx_receiver) = mpsc::unbounded_channel::<Arc<TxEnvelope>>(); - let (_bundle_sender, bundle_receiver) = mpsc::unbounded_channel::<Arc<Vec<TxEnvelope>>>(); + let (tx_sender, tx_receiver) = mpsc::unbounded_channel::<TxEnvelope>(); + let (_bundle_sender, bundle_receiver) = mpsc::unbounded_channel::<Vec<TxEnvelope>>(); let deadline = Instant::now() + Duration::from_secs(2); // Create a new anvil instance @@ -35,8 +36,7 @@ async fn test_spawn() { let latest = root_provider.get_block_number().await.unwrap(); // Create an alloyDB from the provider at the latest height - let alloy_db = - AlloyDB::new(Arc::new(root_provider.clone()), BlockId::Number(latest.into())).unwrap(); + let alloy_db = AlloyDB::new(Arc::new(root_provider.clone()), BlockId::Number(latest.into())); let db = CacheDB::new(Arc::new(alloy_db)); // Define trevm extension, if any