Skip to content

Commit 2ca9df1

Browse files
committed
WIP: Updating to alloy at 0.12
1 parent 567442b commit 2ca9df1

File tree

6 files changed

+114
-111
lines changed

6 files changed

+114
-111
lines changed

Cargo.toml

+1-5
Original file line numberDiff line numberDiff line change
@@ -29,11 +29,7 @@ zenith-types = "0.15"
2929
alloy = { version = "=0.11.1", features = ["full", "json-rpc", "signer-aws", "rpc-types-mev", "rlp", "node-bindings"] }
3030

3131
trevm = { version = "0.19.12", features = [ "concurrent-db" ]}
32-
revm = { version = "19.6.0", features = [ "alloydb" ]}
33-
34-
# HACK: Update these to use main alloy package
35-
alloy-provider = { version = "0.7.3" }
36-
alloy-eips = { version = "0.7.3" }
32+
revm = { git="https://github.com/bluealloy/revm.git", tag="v59", features = [ "alloydb" ]}
3733

3834
aws-config = "1.1.7"
3935
aws-sdk-kms = "1.15.0"

bin/builder.rs

+23-6
Original file line numberDiff line numberDiff line change
@@ -3,12 +3,16 @@
33
use builder::config::BuilderConfig;
44
use builder::service::serve_builder_with_span;
55
use builder::tasks::block::BlockBuilder;
6+
use builder::tasks::bundler::BundlePoller;
67
use builder::tasks::metrics::MetricsTask;
78
use builder::tasks::oauth::Authenticator;
9+
use builder::tasks::simulator::eval_fn;
810
use builder::tasks::submit::SubmitTask;
911

10-
use builder::tasks::tx_poller;
12+
use builder::tasks::tx_poller::{self, TxPoller};
13+
use revm::primitives::U256;
1114
use tokio::select;
15+
use trevm::revm::primitives::ResultAndState;
1216

1317
#[tokio::main]
1418
async fn main() -> eyre::Result<()> {
@@ -17,17 +21,22 @@ async fn main() -> eyre::Result<()> {
1721
let span = tracing::info_span!("zenith-builder");
1822

1923
let config = BuilderConfig::load_from_env()?.clone();
24+
2025
let host_provider = config.connect_host_provider().await?;
2126
let ru_provider = config.connect_ru_provider().await?;
22-
let authenticator = Authenticator::new(&config);
23-
24-
tracing::debug!(rpc_url = config.host_rpc_url.as_ref(), "instantiated provider");
27+
tracing::info!(rpc_url = config.host_rpc_url.as_ref(), "instantiated provider");
2528

2629
let sequencer_signer = config.connect_sequencer_signer().await?;
2730
let zenith = config.connect_zenith(host_provider.clone());
31+
tracing::info!("instantiated zenith");
2832

2933
let metrics = MetricsTask { host_provider: host_provider.clone() };
3034
let (tx_channel, metrics_jh) = metrics.spawn();
35+
tracing::info!("instantiated zenith");
36+
37+
let authenticator = Authenticator::new(&config);
38+
let authenticator_jh = authenticator.spawn();
39+
tracing::info!("instantiated authenticator");
3140

3241
let builder = BlockBuilder::new(&config, authenticator.clone(), ru_provider.clone());
3342
let submit = SubmitTask {
@@ -39,10 +48,15 @@ async fn main() -> eyre::Result<()> {
3948
config: config.clone(),
4049
outbound_tx_channel: tx_channel,
4150
};
51+
52+
let tx_poller = TxPoller::new_with_poll_interval_ms(&config, 1000);
53+
let (tx_channel, tx_jh) = tx_poller.spawn();
54+
55+
let bundle_poller = BundlePoller::new_with_poll_interval_ms(&config, authenticator, 1);
56+
let bundle_channel = bundle_poller.spawn();
4257

43-
let authenticator_jh = authenticator.spawn();
4458
let (submit_channel, submit_jh) = submit.spawn();
45-
let build_jh = builder.spawn(submit_channel);
59+
let build_jh = builder.spawn(tx_channel, bundle_channel, submit_channel);
4660

4761
let port = config.builder_port;
4862
let server = serve_builder_with_span(([0, 0, 0, 0], port), span);
@@ -63,6 +77,9 @@ async fn main() -> eyre::Result<()> {
6377
_ = authenticator_jh => {
6478
tracing::info!("authenticator finished");
6579
}
80+
_ = tx_jh => {
81+
tracing::info!("tx-poller finished");
82+
}
6683
}
6784

6885
tracing::info!("shutting down");

src/lib.rs

-2
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,5 @@ pub mod tasks;
2727
/// Utilities.
2828
pub mod utils;
2929

30-
use alloy_eips as _;
31-
use alloy_provider as _;
3230
/// Anonymous crate dependency imports.
3331
use openssl as _;

src/tasks/block.rs

+47-60
Original file line numberDiff line numberDiff line change
@@ -1,20 +1,23 @@
1-
use super::bundler::{Bundle, BundlePoller};
2-
use super::oauth::Authenticator;
3-
use super::tx_poller::TxPoller;
41
use crate::config::{BuilderConfig, WalletlessProvider};
5-
use crate::tasks::simulator::SimulatorFactory;
2+
use crate::tasks::bundler::{Bundle, BundlePoller};
3+
use crate::tasks::oauth::Authenticator;
4+
use crate::tasks::simulator::{eval_fn, SimulatorFactory};
5+
use crate::tasks::tx_poller::TxPoller;
66
use alloy::{
77
consensus::{SidecarBuilder, SidecarCoder, TxEnvelope},
88
eips::eip2718::Decodable2718,
9+
network::Ethereum,
910
primitives::{keccak256, Bytes, B256},
10-
providers::Provider as _,
11+
providers::Provider,
1112
rlp::Buf,
1213
};
13-
use revm::db::{AlloyDB, CacheDB};
14-
use std::sync::Arc;
15-
use std::time::{SystemTime, UNIX_EPOCH};
16-
use std::{sync::OnceLock, time::Duration};
17-
use tokio::{sync::mpsc, task::JoinHandle};
14+
use revm::database::AlloyDB;
15+
use std::{
16+
sync::{Arc, OnceLock},
17+
time::Duration,
18+
time::{SystemTime, UNIX_EPOCH},
19+
};
20+
use tokio::{sync::mpsc, task::JoinHandle, time::Instant};
1821
use tracing::{debug, error, info, trace, Instrument};
1922
use zenith_types::{encode_txns, Alloy2718Coder, ZenithEthBundle};
2023

@@ -171,26 +174,7 @@ impl BlockBuilder {
171174
}
172175
}
173176

174-
/// Fetches bundles from the cache and ingests them into the in progress block
175-
async fn get_bundles(&mut self, in_progress: &mut InProgressBlock) {
176-
trace!("query bundles from cache");
177-
let bundles = self.bundle_poller.check_bundle_cache().await;
178-
match bundles {
179-
Ok(bundles) => {
180-
for bundle in bundles {
181-
match self.simulate_bundle(&bundle.bundle).await {
182-
Ok(()) => in_progress.ingest_bundle(bundle.clone()),
183-
Err(e) => error!(error = %e, id = ?bundle.id, "bundle simulation failed"),
184-
}
185-
}
186-
}
187-
Err(e) => {
188-
error!(error = %e, "error polling bundles");
189-
}
190-
}
191-
}
192-
193-
/// Simulates a Zenith bundle against the rollup state
177+
/// Simulates a Zenith bundle against the rollup state.
194178
async fn simulate_bundle(&mut self, bundle: &ZenithEthBundle) -> eyre::Result<()> {
195179
// TODO: Simulate bundles with the Simulation Engine
196180
// [ENG-672](https://linear.app/initiates/issue/ENG-672/add-support-for-bundles)
@@ -233,48 +217,51 @@ impl BlockBuilder {
233217

234218
/// Spawn the block builder task, returning the inbound channel to it, and
235219
/// a handle to the running task.
236-
pub fn spawn(mut self, outbound: mpsc::UnboundedSender<InProgressBlock>) -> JoinHandle<()> {
220+
pub fn spawn<E>(
221+
mut self,
222+
inbound_tx: mpsc::UnboundedReceiver<TxEnvelope>,
223+
inbound_bundle: mpsc::UnboundedReceiver<()>,
224+
outbound: mpsc::UnboundedSender<InProgressBlock>,
225+
) -> JoinHandle<()> {
237226
tokio::spawn(
238227
async move {
239228
loop {
240-
// sleep the buffer time
229+
// Sleep the buffer time during block wake up
241230
tokio::time::sleep(Duration::from_secs(self.secs_to_next_target())).await;
242231
info!("beginning block build cycle");
243232

244233
// Setup a simulator factory
245234
let ru_provider = self.ru_provider.clone();
246235
let latest = ru_provider.get_block_number().await.unwrap();
247-
let db = AlloyDB::new(
236+
let db: AlloyDB<Ethereum, WalletlessProvider> = AlloyDB::new(
248237
ru_provider.into(),
249-
alloy_eips::BlockId::Number(latest.into()),
238+
alloy::eips::BlockId::Number(latest.into()),
250239
);
251-
252-
// Calculate the simulation deadline
253-
let deadline = self.secs_to_next_target();
254-
255-
// Create a simulator instance
256-
if let Some(db) = db {
257-
let cache_db = CacheDB::new(Arc::new(db));
258-
let sim = SimulatorFactory::new(cache_db, ());
259-
260-
// TODO: Plumb the
261-
let in_progress =
262-
sim.spawn(inbound_tx, inbound_bundle, evaluator, deadline).await;
263-
outbound.send(in_progress);
264-
} else {
265-
todo!("handle failure to get a db")
266-
}
267-
268-
// submit the block if it has transactions
269-
if !in_progress.is_empty() {
270-
debug!(txns = in_progress.len(), "sending block to submit task");
271-
let in_progress_block = std::mem::take(&mut in_progress);
272-
if outbound.send(in_progress_block).is_err() {
273-
error!("downstream task gone");
274-
break;
240+
let sim = SimulatorFactory::new(db, ());
241+
242+
// Calculate the deadline
243+
let time_to_next_slot = self.secs_to_next_slot();
244+
let now = Instant::now();
245+
let deadline = now.checked_add(Duration::from_secs(time_to_next_slot)).unwrap();
246+
247+
// Run the simulation until the deadline
248+
let sim_result =
249+
sim.spawn(inbound_tx, inbound_bundle, Arc::new(eval_fn), deadline).await;
250+
251+
// Handle simulation results
252+
if let Ok(in_progress) = sim_result {
253+
if !in_progress.is_empty() {
254+
debug!(txns = in_progress.len(), "sending block to submit task");
255+
let in_progress_block = std::mem::take(&mut in_progress);
256+
257+
// Send the received block and error if there's an issue
258+
if outbound.send(in_progress_block).is_err() {
259+
error!("downstream task gone");
260+
break;
261+
}
262+
} else {
263+
debug!("no transactions, skipping block submission");
275264
}
276-
} else {
277-
debug!("no transactions, skipping block submission");
278265
}
279266
}
280267
}

src/tasks/simulator.rs

+28-23
Original file line numberDiff line numberDiff line change
@@ -2,14 +2,15 @@ use crate::tasks::block::InProgressBlock;
22
use alloy::consensus::TxEnvelope;
33
use alloy::primitives::U256;
44
use eyre::Result;
5-
use revm::{db::CacheDB, primitives::CfgEnv, DatabaseRef};
5+
use revm::primitives::address;
66
use std::{convert::Infallible, sync::Arc};
7-
use tokio::{select, sync::mpsc::{Receiver, UnboundedReceiver}, task::JoinSet};
7+
use tokio::{select, sync::mpsc::Receiver, task::JoinSet};
8+
use tracing::debug;
89
use trevm::{
910
db::sync::{ConcurrentState, ConcurrentStateInfo},
1011
revm::{
11-
primitives::{EVMError, ResultAndState},
12-
Database, DatabaseCommit, EvmBuilder,
12+
primitives::{Account, CfgEnv, EVMError, ExecutionResult, ResultAndState},
13+
Database, DatabaseCommit, DatabaseRef, EvmBuilder,
1314
},
1415
BlockDriver, Cfg, DbConnect, EvmFactory, NoopBlock, TrevmBuilder, Tx,
1516
};
@@ -56,8 +57,8 @@ where
5657
/// * This function always returns whatever the latest finished in progress block is.
5758
pub fn spawn<T, F>(
5859
self,
59-
mut inbound_tx: Receiver<Arc<TxEnvelope>>,
60-
_inbound_bundle: Receiver<Arc<Vec<TxEnvelope>>>,
60+
mut inbound_tx: Receiver<TxEnvelope>,
61+
_inbound_bundle: Receiver<()>,
6162
evaluator: Arc<F>,
6263
deadline: tokio::time::Instant,
6364
) -> tokio::task::JoinHandle<InProgressBlock>
@@ -84,12 +85,12 @@ where
8485
if let Some(inbound_tx) = tx {
8586
// Setup the simulation environment
8687
let sim = self.clone();
87-
let eval = evaluator.clone();
8888
let mut parent_db = Arc::new(sim.connect().unwrap());
89+
let eval_fn = evaluator.clone();
8990

9091
// Kick off the work in a new thread
9192
join_set.spawn(async move {
92-
let result = sim.simulate_tx(inbound_tx, eval, parent_db.child());
93+
let result = sim.simulate_tx(inbound_tx, eval_fn, parent_db.child());
9394

9495
if let Some((best, db)) = result {
9596
if let Ok(()) = parent_db.merge_child(db) {
@@ -134,7 +135,7 @@ where
134135
/// Simulates an inbound tx and applies its state if it's successfully simualted
135136
pub fn simulate_tx<F>(
136137
self,
137-
tx: Arc<TxEnvelope>,
138+
tx: TxEnvelope,
138139
evaluator: Arc<F>,
139140
db: ConcurrentState<Arc<ConcurrentState<Db>>>,
140141
) -> SimResult<Db>
@@ -147,7 +148,7 @@ where
147148
let result = trevm_instance
148149
.fill_cfg(&PecorinoCfg)
149150
.fill_block(&NoopBlock)
150-
.fill_tx(tx.as_ref()) // Use as_ref() to get &SimTxEnvelope from Arc
151+
.fill_tx(&tx) // Use as_ref() to get &SimTxEnvelope from Arc
151152
.run();
152153

153154
match result {
@@ -164,7 +165,7 @@ where
164165
let db = t.1.into_db();
165166

166167
// return the updated db with the candidate applied to its state
167-
Some((Best { tx, result, score }, db))
168+
Some((Best { tx: tx.into(), result, score }, db))
168169
}
169170
Err(e) => {
170171
// if this transaction fails to run, log the error and return None
@@ -173,20 +174,24 @@ where
173174
}
174175
}
175176
}
177+
}
176178

177-
/// Simulates an inbound bundle and applies its state if it's successfully simulated
178-
pub fn simulate_bundle<T, F>(
179-
&self,
180-
_bundle: Arc<Vec<T>>,
181-
_evaluator: Arc<F>,
182-
_trevm_instance: trevm::EvmNeedsCfg<'_, (), ConcurrentState<CacheDB<Arc<Db>>>>,
183-
) -> Option<Best<Vec<T>>>
184-
where
185-
T: Tx + Send + Sync + 'static,
186-
F: Fn(&ResultAndState) -> U256 + Send + Sync + 'static,
187-
{
188-
todo!("implement bundle handling")
179+
/// Simple evaluation function for builder scoring.
180+
pub fn eval_fn(state: &ResultAndState) -> U256 {
181+
// log the transaction results
182+
match &state.result {
183+
ExecutionResult::Success { .. } => debug!("execution successful"),
184+
ExecutionResult::Revert { .. } => debug!("execution reverted"),
185+
ExecutionResult::Halt { .. } => debug!("execution halted"),
189186
}
187+
188+
// return the target account balance
189+
let target_addr = address!("0x0000000000000000000000000000000000000000");
190+
let default_account = Account::default();
191+
let target_account = state.state.get(&target_addr).unwrap_or(&default_account);
192+
tracing::info!(balance = ?target_account.info.balance, "target account balance");
193+
194+
target_account.info.balance
190195
}
191196

192197
/// Wraps a Db into an EvmFactory compatible [`Database`]

tests/simulator_test.rs

+15-15
Original file line numberDiff line numberDiff line change
@@ -1,25 +1,26 @@
1-
use alloy::consensus::{SignableTransaction as _, TxEip1559, TxEnvelope};
2-
use alloy::primitives::U256;
3-
use alloy::signers::local::PrivateKeySigner;
4-
use alloy::signers::SignerSync as _;
1+
use alloy::{
2+
consensus::{TxEnvelope, TxEip1559, SignableTransaction},
3+
eips::BlockId,
4+
primitives::U256,
5+
providers::{Provider, ProviderBuilder},
6+
signers::local::PrivateKeySigner,
7+
signers::SignerSync as _,
8+
};
59
use builder::tasks::simulator::SimulatorFactory;
6-
use revm::db::{AlloyDB, CacheDB};
7-
use revm::primitives::{address, TxKind};
10+
use revm::{
11+
database::{AlloyDB, CacheDB},
12+
primitives::{address, TxKind},
13+
};
814
use std::sync::Arc;
915
use tokio::sync::mpsc;
1016
use tokio::time::{Duration, Instant};
1117
use trevm::revm::primitives::{Account, ExecutionResult, ResultAndState};
1218

13-
// HACK: These have to be pinned to 0.7.3 because of revm version issues.
14-
// Once revm is updated, use the main alloy package again.
15-
use alloy_eips::BlockId;
16-
use alloy_provider::{Provider, ProviderBuilder};
17-
1819
#[tokio::test(flavor = "multi_thread")]
1920
async fn test_spawn() {
2021
// Setup transaction pipeline plumbing
21-
let (tx_sender, tx_receiver) = mpsc::unbounded_channel::<Arc<TxEnvelope>>();
22-
let (_bundle_sender, bundle_receiver) = mpsc::unbounded_channel::<Arc<Vec<TxEnvelope>>>();
22+
let (tx_sender, tx_receiver) = mpsc::unbounded_channel::<TxEnvelope>();
23+
let (_bundle_sender, bundle_receiver) = mpsc::unbounded_channel::<Vec<TxEnvelope>>();
2324
let deadline = Instant::now() + Duration::from_secs(2);
2425

2526
// Create a new anvil instance
@@ -35,8 +36,7 @@ async fn test_spawn() {
3536
let latest = root_provider.get_block_number().await.unwrap();
3637

3738
// Create an alloyDB from the provider at the latest height
38-
let alloy_db =
39-
AlloyDB::new(Arc::new(root_provider.clone()), BlockId::Number(latest.into())).unwrap();
39+
let alloy_db = AlloyDB::new(Arc::new(root_provider.clone()), BlockId::Number(latest.into()));
4040
let db = CacheDB::new(Arc::new(alloy_db));
4141

4242
// Define trevm extension, if any

0 commit comments

Comments
 (0)