Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[wip] wires up the block and submit task to use simulation #64

Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 1 addition & 5 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -29,11 +29,7 @@ zenith-types = "0.15"
alloy = { version = "=0.11.1", features = ["full", "json-rpc", "signer-aws", "rpc-types-mev", "rlp", "node-bindings"] }

trevm = { version = "0.19.12", features = [ "concurrent-db" ]}
revm = { version = "19.6.0", features = [ "alloydb" ]}

# HACK: Update these to use main alloy package
alloy-provider = { version = "0.7.3" }
alloy-eips = { version = "0.7.3" }
revm = { git="https://github.com/bluealloy/revm.git", tag="v59", features = [ "alloydb" ]}

aws-config = "1.1.7"
aws-sdk-kms = "1.15.0"
Expand Down
28 changes: 23 additions & 5 deletions bin/builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,11 +3,16 @@
use builder::config::BuilderConfig;
use builder::service::serve_builder_with_span;
use builder::tasks::block::BlockBuilder;
use builder::tasks::bundler::BundlePoller;
use builder::tasks::metrics::MetricsTask;
use builder::tasks::oauth::Authenticator;
use builder::tasks::simulator::eval_fn;
use builder::tasks::submit::SubmitTask;

use builder::tasks::tx_poller::{self, TxPoller};
use revm::primitives::U256;
use tokio::select;
use trevm::revm::primitives::ResultAndState;

#[tokio::main]
async fn main() -> eyre::Result<()> {
Expand All @@ -16,17 +21,22 @@ async fn main() -> eyre::Result<()> {
let span = tracing::info_span!("zenith-builder");

let config = BuilderConfig::load_from_env()?.clone();

let host_provider = config.connect_host_provider().await?;
let ru_provider = config.connect_ru_provider().await?;
let authenticator = Authenticator::new(&config);

tracing::debug!(rpc_url = config.host_rpc_url.as_ref(), "instantiated provider");
tracing::info!(rpc_url = config.host_rpc_url.as_ref(), "instantiated provider");

let sequencer_signer = config.connect_sequencer_signer().await?;
let zenith = config.connect_zenith(host_provider.clone());
tracing::info!("instantiated zenith");

let metrics = MetricsTask { host_provider: host_provider.clone() };
let (tx_channel, metrics_jh) = metrics.spawn();
tracing::info!("instantiated zenith");

let authenticator = Authenticator::new(&config);
let authenticator_jh = authenticator.spawn();
tracing::info!("instantiated authenticator");

let builder = BlockBuilder::new(&config, authenticator.clone(), ru_provider.clone());
let submit = SubmitTask {
Expand All @@ -38,10 +48,15 @@ async fn main() -> eyre::Result<()> {
config: config.clone(),
outbound_tx_channel: tx_channel,
};

let tx_poller = TxPoller::new_with_poll_interval_ms(&config, 1000);
let (tx_channel, tx_jh) = tx_poller.spawn();

let bundle_poller = BundlePoller::new_with_poll_interval_ms(&config, authenticator, 1);
let bundle_channel = bundle_poller.spawn();

let authenticator_jh = authenticator.spawn();
let (submit_channel, submit_jh) = submit.spawn();
let build_jh = builder.spawn(submit_channel);
let build_jh = builder.spawn(tx_channel, bundle_channel, submit_channel);

let port = config.builder_port;
let server = serve_builder_with_span(([0, 0, 0, 0], port), span);
Expand All @@ -62,6 +77,9 @@ async fn main() -> eyre::Result<()> {
_ = authenticator_jh => {
tracing::info!("authenticator finished");
}
_ = tx_jh => {
tracing::info!("tx-poller finished");
}
}

tracing::info!("shutting down");
Expand Down
2 changes: 0 additions & 2 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,5 @@ pub mod tasks;
/// Utilities.
pub mod utils;

use alloy_eips as _;
use alloy_provider as _;
/// Anonymous crate dependency imports.
use openssl as _;
98 changes: 52 additions & 46 deletions src/tasks/block.rs
Original file line number Diff line number Diff line change
@@ -1,17 +1,23 @@
use super::bundler::{Bundle, BundlePoller};
use super::oauth::Authenticator;
use super::tx_poller::TxPoller;
use crate::config::{BuilderConfig, WalletlessProvider};
use crate::tasks::bundler::{Bundle, BundlePoller};
use crate::tasks::oauth::Authenticator;
use crate::tasks::simulator::{eval_fn, SimulatorFactory};
use crate::tasks::tx_poller::TxPoller;
use alloy::{
consensus::{SidecarBuilder, SidecarCoder, TxEnvelope},
eips::eip2718::Decodable2718,
network::Ethereum,
primitives::{keccak256, Bytes, B256},
providers::Provider as _,
providers::Provider,
rlp::Buf,
};
use std::time::{SystemTime, UNIX_EPOCH};
use std::{sync::OnceLock, time::Duration};
use tokio::{sync::mpsc, task::JoinHandle};
use revm::database::AlloyDB;
use std::{
sync::{Arc, OnceLock},
time::Duration,
time::{SystemTime, UNIX_EPOCH},
};
use tokio::{sync::mpsc, task::JoinHandle, time::Instant};
use tracing::{debug, error, info, trace, Instrument};
use zenith_types::{encode_txns, Alloy2718Coder, ZenithEthBundle};

Expand Down Expand Up @@ -168,26 +174,7 @@ impl BlockBuilder {
}
}

/// Fetches bundles from the cache and ingests them into the in progress block
async fn get_bundles(&mut self, in_progress: &mut InProgressBlock) {
trace!("query bundles from cache");
let bundles = self.bundle_poller.check_bundle_cache().await;
match bundles {
Ok(bundles) => {
for bundle in bundles {
match self.simulate_bundle(&bundle.bundle).await {
Ok(()) => in_progress.ingest_bundle(bundle.clone()),
Err(e) => error!(error = %e, id = ?bundle.id, "bundle simulation failed"),
}
}
}
Err(e) => {
error!(error = %e, "error polling bundles");
}
}
}

/// Simulates a Zenith bundle against the rollup state
/// Simulates a Zenith bundle against the rollup state.
async fn simulate_bundle(&mut self, bundle: &ZenithEthBundle) -> eyre::Result<()> {
// TODO: Simulate bundles with the Simulation Engine
// [ENG-672](https://linear.app/initiates/issue/ENG-672/add-support-for-bundles)
Expand Down Expand Up @@ -230,32 +217,51 @@ impl BlockBuilder {

/// Spawn the block builder task, returning the inbound channel to it, and
/// a handle to the running task.
pub fn spawn(mut self, outbound: mpsc::UnboundedSender<InProgressBlock>) -> JoinHandle<()> {
pub fn spawn<E>(
mut self,
inbound_tx: mpsc::UnboundedReceiver<TxEnvelope>,
inbound_bundle: mpsc::UnboundedReceiver<()>,
outbound: mpsc::UnboundedSender<InProgressBlock>,
) -> JoinHandle<()> {
tokio::spawn(
async move {
loop {
// sleep the buffer time
// Sleep the buffer time during block wake up
tokio::time::sleep(Duration::from_secs(self.secs_to_next_target())).await;
info!("beginning block build cycle");

// Build a block
let mut in_progress = InProgressBlock::default();
self.get_transactions(&mut in_progress).await;
self.get_bundles(&mut in_progress).await;

// Filter confirmed transactions from the block
self.filter_transactions(&mut in_progress).await;

// submit the block if it has transactions
if !in_progress.is_empty() {
debug!(txns = in_progress.len(), "sending block to submit task");
let in_progress_block = std::mem::take(&mut in_progress);
if outbound.send(in_progress_block).is_err() {
error!("downstream task gone");
break;
// Setup a simulator factory
let ru_provider = self.ru_provider.clone();
let latest = ru_provider.get_block_number().await.unwrap();
let db: AlloyDB<Ethereum, WalletlessProvider> = AlloyDB::new(
ru_provider.into(),
alloy::eips::BlockId::Number(latest.into()),
);
let sim = SimulatorFactory::new(db, ());

// Calculate the deadline
let time_to_next_slot = self.secs_to_next_slot();
let now = Instant::now();
let deadline = now.checked_add(Duration::from_secs(time_to_next_slot)).unwrap();

// Run the simulation until the deadline
let sim_result =
sim.spawn(inbound_tx, inbound_bundle, Arc::new(eval_fn), deadline).await;

// Handle simulation results
if let Ok(in_progress) = sim_result {
if !in_progress.is_empty() {
debug!(txns = in_progress.len(), "sending block to submit task");
let in_progress_block = std::mem::take(&mut in_progress);

// Send the received block and error if there's an issue
if outbound.send(in_progress_block).is_err() {
error!("downstream task gone");
break;
}
} else {
debug!("no transactions, skipping block submission");
}
} else {
debug!("no transactions, skipping block submission");
}
}
}
Expand Down
51 changes: 28 additions & 23 deletions src/tasks/simulator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,14 +2,15 @@ use crate::tasks::block::InProgressBlock;
use alloy::consensus::TxEnvelope;
use alloy::primitives::U256;
use eyre::Result;
use revm::{db::CacheDB, primitives::CfgEnv, DatabaseRef};
use revm::primitives::address;
use std::{convert::Infallible, sync::Arc};
use tokio::{select, sync::mpsc::UnboundedReceiver, task::JoinSet};
use tokio::{select, sync::mpsc::Receiver, task::JoinSet};
use tracing::debug;
use trevm::{
db::sync::{ConcurrentState, ConcurrentStateInfo},
revm::{
primitives::{EVMError, ResultAndState},
Database, DatabaseCommit, EvmBuilder,
primitives::{Account, CfgEnv, EVMError, ExecutionResult, ResultAndState},
Database, DatabaseCommit, DatabaseRef, EvmBuilder,
},
BlockDriver, Cfg, DbConnect, EvmFactory, NoopBlock, TrevmBuilder, Tx,
};
Expand Down Expand Up @@ -56,8 +57,8 @@ where
/// * This function always returns whatever the latest finished in progress block is.
pub fn spawn<T, F>(
self,
mut inbound_tx: UnboundedReceiver<Arc<TxEnvelope>>,
_inbound_bundle: UnboundedReceiver<Arc<Vec<TxEnvelope>>>,
mut inbound_tx: Receiver<TxEnvelope>,
_inbound_bundle: Receiver<()>,
evaluator: Arc<F>,
deadline: tokio::time::Instant,
) -> tokio::task::JoinHandle<InProgressBlock>
Expand All @@ -84,12 +85,12 @@ where
if let Some(inbound_tx) = tx {
// Setup the simulation environment
let sim = self.clone();
let eval = evaluator.clone();
let mut parent_db = Arc::new(sim.connect().unwrap());
let eval_fn = evaluator.clone();

// Kick off the work in a new thread
join_set.spawn(async move {
let result = sim.simulate_tx(inbound_tx, eval, parent_db.child());
let result = sim.simulate_tx(inbound_tx, eval_fn, parent_db.child());

if let Some((best, db)) = result {
if let Ok(()) = parent_db.merge_child(db) {
Expand Down Expand Up @@ -134,7 +135,7 @@ where
/// Simulates an inbound tx and applies its state if it's successfully simualted
pub fn simulate_tx<F>(
self,
tx: Arc<TxEnvelope>,
tx: TxEnvelope,
evaluator: Arc<F>,
db: ConcurrentState<Arc<ConcurrentState<Db>>>,
) -> SimResult<Db>
Expand All @@ -147,7 +148,7 @@ where
let result = trevm_instance
.fill_cfg(&PecorinoCfg)
.fill_block(&NoopBlock)
.fill_tx(tx.as_ref()) // Use as_ref() to get &SimTxEnvelope from Arc
.fill_tx(&tx) // Use as_ref() to get &SimTxEnvelope from Arc
.run();

match result {
Expand All @@ -164,7 +165,7 @@ where
let db = t.1.into_db();

// return the updated db with the candidate applied to its state
Some((Best { tx, result, score }, db))
Some((Best { tx: tx.into(), result, score }, db))
}
Err(e) => {
// if this transaction fails to run, log the error and return None
Expand All @@ -173,20 +174,24 @@ where
}
}
}
}

/// Simulates an inbound bundle and applies its state if it's successfully simulated
pub fn simulate_bundle<T, F>(
&self,
_bundle: Arc<Vec<T>>,
_evaluator: Arc<F>,
_trevm_instance: trevm::EvmNeedsCfg<'_, (), ConcurrentState<CacheDB<Arc<Db>>>>,
) -> Option<Best<Vec<T>>>
where
T: Tx + Send + Sync + 'static,
F: Fn(&ResultAndState) -> U256 + Send + Sync + 'static,
{
todo!("implement bundle handling")
/// Simple evaluation function for builder scoring.
pub fn eval_fn(state: &ResultAndState) -> U256 {
// log the transaction results
match &state.result {
ExecutionResult::Success { .. } => debug!("execution successful"),
ExecutionResult::Revert { .. } => debug!("execution reverted"),
ExecutionResult::Halt { .. } => debug!("execution halted"),
}

// return the target account balance
let target_addr = address!("0x0000000000000000000000000000000000000000");
let default_account = Account::default();
let target_account = state.state.get(&target_addr).unwrap_or(&default_account);
tracing::info!(balance = ?target_account.info.balance, "target account balance");

target_account.info.balance
}

/// Wraps a Db into an EvmFactory compatible [`Database`]
Expand Down
30 changes: 15 additions & 15 deletions tests/simulator_test.rs
Original file line number Diff line number Diff line change
@@ -1,25 +1,26 @@
use alloy::consensus::{SignableTransaction as _, TxEip1559, TxEnvelope};
use alloy::primitives::U256;
use alloy::signers::local::PrivateKeySigner;
use alloy::signers::SignerSync as _;
use alloy::{
consensus::{TxEnvelope, TxEip1559, SignableTransaction},
eips::BlockId,
primitives::U256,
providers::{Provider, ProviderBuilder},
signers::local::PrivateKeySigner,
signers::SignerSync as _,
};
use builder::tasks::simulator::SimulatorFactory;
use revm::db::{AlloyDB, CacheDB};
use revm::primitives::{address, TxKind};
use revm::{
database::{AlloyDB, CacheDB},
primitives::{address, TxKind},
};
use std::sync::Arc;
use tokio::sync::mpsc;
use tokio::time::{Duration, Instant};
use trevm::revm::primitives::{Account, ExecutionResult, ResultAndState};

// HACK: These have to be pinned to 0.7.3 because of revm version issues.
// Once revm is updated, use the main alloy package again.
use alloy_eips::BlockId;
use alloy_provider::{Provider, ProviderBuilder};

#[tokio::test(flavor = "multi_thread")]
async fn test_spawn() {
// Setup transaction pipeline plumbing
let (tx_sender, tx_receiver) = mpsc::unbounded_channel::<Arc<TxEnvelope>>();
let (_bundle_sender, bundle_receiver) = mpsc::unbounded_channel::<Arc<Vec<TxEnvelope>>>();
let (tx_sender, tx_receiver) = mpsc::unbounded_channel::<TxEnvelope>();
let (_bundle_sender, bundle_receiver) = mpsc::unbounded_channel::<Vec<TxEnvelope>>();
let deadline = Instant::now() + Duration::from_secs(2);

// Create a new anvil instance
Expand All @@ -35,8 +36,7 @@ async fn test_spawn() {
let latest = root_provider.get_block_number().await.unwrap();

// Create an alloyDB from the provider at the latest height
let alloy_db =
AlloyDB::new(Arc::new(root_provider.clone()), BlockId::Number(latest.into())).unwrap();
let alloy_db = AlloyDB::new(Arc::new(root_provider.clone()), BlockId::Number(latest.into()));
let db = CacheDB::new(Arc::new(alloy_db));

// Define trevm extension, if any
Expand Down
Loading