Skip to content

Commit 0f17597

Browse files
committed
adds simulation factory to builder
- adds initial implementation of simulator task to the builder
1 parent 0e77b32 commit 0f17597

File tree

6 files changed

+430
-0
lines changed

6 files changed

+430
-0
lines changed

Cargo.toml

+7
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,13 @@ zenith-types = "0.15"
2828

2929
alloy = { version = "=0.11.1", features = ["full", "json-rpc", "signer-aws", "rpc-types-mev", "rlp", "node-bindings"] }
3030

31+
trevm = { version = "0.19.12", features = [ "concurrent-db" ]}
32+
revm = { version = "19.6.0", features = [ "alloydb" ]}
33+
34+
# HACK: Update these to use main alloy package
35+
alloy-provider = { version = "0.7.3" }
36+
alloy-eips = { version = "0.7.3" }
37+
3138
aws-config = "1.1.7"
3239
aws-sdk-kms = "1.15.0"
3340

src/lib.rs

+2
Original file line numberDiff line numberDiff line change
@@ -27,5 +27,7 @@ pub mod tasks;
2727
/// Utilities.
2828
pub mod utils;
2929

30+
use alloy_eips as _;
31+
use alloy_provider as _;
3032
/// Anonymous crate dependency imports.
3133
use openssl as _;

src/tasks/block.rs

+5
Original file line numberDiff line numberDiff line change
@@ -42,6 +42,11 @@ impl InProgressBlock {
4242
self.transactions.is_empty()
4343
}
4444

45+
/// Returns the current list of transactions included in this block
46+
pub fn transactions(&self) -> Vec<TxEnvelope> {
47+
self.transactions.clone()
48+
}
49+
4550
/// Unseal the block
4651
fn unseal(&mut self) {
4752
self.raw_encoding.take();

src/tasks/mod.rs

+3
Original file line numberDiff line numberDiff line change
@@ -15,3 +15,6 @@ pub mod submit;
1515

1616
/// Tx polling task
1717
pub mod tx_poller;
18+
19+
/// Tx and bundle simulation task
20+
pub mod simulator;

src/tasks/simulator.rs

+316
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,316 @@
1+
use crate::tasks::block::InProgressBlock;
2+
use alloy::consensus::TxEnvelope;
3+
use alloy::primitives::U256;
4+
use eyre::Result;
5+
use revm::{db::CacheDB, primitives::CfgEnv, DatabaseRef};
6+
use std::{convert::Infallible, sync::Arc};
7+
use tokio::{select, sync::mpsc::UnboundedReceiver, task::JoinSet};
8+
use trevm::{
9+
db::sync::{ConcurrentState, ConcurrentStateInfo},
10+
revm::{
11+
primitives::{EVMError, ResultAndState},
12+
Database, DatabaseCommit, EvmBuilder,
13+
},
14+
BlockDriver, Cfg, DbConnect, EvmFactory, NoopBlock, TrevmBuilder, Tx,
15+
};
16+
17+
/// Tracks the EVM state, score, and result of an EVM execution.
18+
/// Scores are assigned by the evaluation function, and are Ord
19+
/// or PartialOrd to allow for sorting.
20+
#[derive(Debug, Clone)]
21+
pub struct Best<T, S: PartialOrd + Ord = U256> {
22+
/// The transaction being executed.
23+
pub tx: Arc<T>,
24+
/// The result and state of the execution.
25+
pub result: ResultAndState,
26+
/// The score calculated by the evaluation function.
27+
pub score: S,
28+
}
29+
30+
/// Binds a database and an extension together.
31+
#[derive(Debug, Clone)]
32+
pub struct SimulatorFactory<Db, Ext> {
33+
/// The database state the execution is carried out on.
34+
pub db: Db,
35+
/// The extension, if any, provided to the trevm instance.
36+
pub ext: Ext,
37+
}
38+
39+
/// SimResult is an [`Option`] type that holds a tuple of a transaction and its associated
40+
/// state as a [`Db`] type updates if it was successfully executed.
41+
type SimResult<Db> = Option<(Best<TxEnvelope>, ConcurrentState<Arc<ConcurrentState<Db>>>)>;
42+
43+
impl<Db, Ext> SimulatorFactory<Db, Ext>
44+
where
45+
Ext: Send + Sync + Clone + 'static,
46+
Db: Database + DatabaseRef + DatabaseCommit + Send + Sync + Clone + 'static,
47+
{
48+
/// Creates a new Simulator factory out of the database and extension.
49+
pub const fn new(db: Db, ext: Ext) -> Self {
50+
Self { db, ext }
51+
}
52+
53+
/// Spawns a trevm simulator that runs until `deadline` is hit.
54+
/// * Spawn does not guarantee that a thread is finished before the deadline.
55+
/// * This is intentional, so that it can maximize simulation time before the deadline.
56+
/// * This function always returns whatever the latest finished in progress block is.
57+
pub fn spawn<T, F>(
58+
self,
59+
mut inbound_tx: UnboundedReceiver<Arc<TxEnvelope>>,
60+
_inbound_bundle: UnboundedReceiver<Arc<Vec<TxEnvelope>>>,
61+
evaluator: Arc<F>,
62+
deadline: tokio::time::Instant,
63+
) -> tokio::task::JoinHandle<InProgressBlock>
64+
where
65+
T: Tx,
66+
F: Fn(&ResultAndState) -> U256 + Send + Sync + 'static,
67+
{
68+
tokio::spawn(async move {
69+
// Spawn a join set to track all simulation threads
70+
let mut join_set = JoinSet::new();
71+
72+
let mut best: Option<Best<TxEnvelope>> = None;
73+
74+
let mut block = InProgressBlock::new();
75+
76+
let sleep = tokio::time::sleep_until(deadline);
77+
tokio::pin!(sleep);
78+
79+
loop {
80+
select! {
81+
_ = &mut sleep => break,
82+
// Handle incoming
83+
tx = inbound_tx.recv() => {
84+
if let Some(inbound_tx) = tx {
85+
// Setup the simulation environment
86+
let sim = self.clone();
87+
let eval = evaluator.clone();
88+
let mut parent_db = Arc::new(sim.connect().unwrap());
89+
90+
// Kick off the work in a new thread
91+
join_set.spawn(async move {
92+
let result = sim.simulate_tx(inbound_tx, eval, parent_db.child());
93+
94+
if let Some((best, db)) = result {
95+
if let Ok(()) = parent_db.merge_child(db) {
96+
tracing::debug!("merging updated simulation state");
97+
return Some(best)
98+
}
99+
tracing::error!("failed to update simulation state");
100+
None
101+
} else {
102+
None
103+
}
104+
});
105+
}
106+
}
107+
Some(result) = join_set.join_next() => {
108+
match result {
109+
Ok(Some(candidate)) => {
110+
tracing::info!(tx_hash = ?candidate.tx.tx_hash(), "ingesting transaction");
111+
block.ingest_tx(candidate.tx.as_ref());
112+
113+
if candidate.score > best.as_ref().map(|b| b.score).unwrap_or_default() {
114+
tracing::info!(score = ?candidate.score, "new best candidate found");
115+
best = Some(candidate);
116+
}
117+
}
118+
Ok(None) => {
119+
tracing::debug!("simulation returned no result");
120+
}
121+
Err(e) => {
122+
tracing::error!("simulation task failed: {}", e);
123+
}
124+
}
125+
}
126+
else => break,
127+
}
128+
}
129+
130+
block
131+
})
132+
}
133+
134+
/// Simulates an inbound tx and applies its state if it's successfully simualted
135+
pub fn simulate_tx<F>(
136+
self,
137+
tx: Arc<TxEnvelope>,
138+
evaluator: Arc<F>,
139+
db: ConcurrentState<Arc<ConcurrentState<Db>>>,
140+
) -> SimResult<Db>
141+
where
142+
F: Fn(&ResultAndState) -> U256 + Send + Sync + 'static,
143+
Db: Database + DatabaseRef + DatabaseCommit + Send + Sync + Clone + 'static,
144+
{
145+
let trevm_instance = EvmBuilder::default().with_db(db).build_trevm();
146+
147+
let result = trevm_instance
148+
.fill_cfg(&PecorinoCfg)
149+
.fill_block(&NoopBlock)
150+
.fill_tx(tx.as_ref()) // Use as_ref() to get &SimTxEnvelope from Arc
151+
.run();
152+
153+
match result {
154+
Ok(t) => {
155+
// log and evaluate simulation results
156+
tracing::info!(tx_hash = ?tx.tx_hash(), "transaction simulated");
157+
let result = t.result_and_state().clone();
158+
tracing::debug!(gas_used = &result.result.gas_used(), "gas consumed");
159+
let score = evaluator(&result);
160+
tracing::debug!(score = ?score, "transaction evaluated");
161+
162+
// accept results
163+
let t = t.accept();
164+
let db = t.1.into_db();
165+
166+
// return the updated db with the candidate applied to its state
167+
Some((Best { tx, result, score }, db))
168+
}
169+
Err(e) => {
170+
// if this transaction fails to run, log the error and return None
171+
tracing::error!(err = ?e.as_transaction_error(), "failed to simulate tx");
172+
None
173+
}
174+
}
175+
}
176+
177+
/// Simulates an inbound bundle and applies its state if it's successfully simulated
178+
pub fn simulate_bundle<T, F>(
179+
&self,
180+
_bundle: Arc<Vec<T>>,
181+
_evaluator: Arc<F>,
182+
_trevm_instance: trevm::EvmNeedsCfg<'_, (), ConcurrentState<CacheDB<Arc<Db>>>>,
183+
) -> Option<Best<Vec<T>>>
184+
where
185+
T: Tx + Send + Sync + 'static,
186+
F: Fn(&ResultAndState) -> U256 + Send + Sync + 'static,
187+
{
188+
todo!("implement bundle handling")
189+
}
190+
}
191+
192+
/// Wraps a Db into an EvmFactory compatible [`Database`]
193+
impl<'a, Db, Ext> DbConnect<'a> for SimulatorFactory<Db, Ext>
194+
where
195+
Db: Database + DatabaseRef + DatabaseCommit + Sync + Send + Clone + 'static,
196+
Ext: Sync + Clone,
197+
{
198+
type Database = ConcurrentState<Db>;
199+
type Error = Infallible;
200+
201+
fn connect(&'a self) -> Result<Self::Database, Self::Error> {
202+
let inner = ConcurrentState::new(self.db.clone(), ConcurrentStateInfo::default());
203+
Ok(inner)
204+
}
205+
}
206+
207+
/// Makes a SimulatorFactory capable of creating and configuring trevm instances
208+
impl<'a, Db, Ext> EvmFactory<'a> for SimulatorFactory<Db, Ext>
209+
where
210+
Db: Database + DatabaseRef + DatabaseCommit + Sync + Send + Clone + 'static,
211+
Ext: Sync + Clone,
212+
{
213+
type Ext = ();
214+
215+
/// Create makes a [`ConcurrentState`] database by calling connect
216+
fn create(&'a self) -> Result<trevm::EvmNeedsCfg<'a, Self::Ext, Self::Database>, Self::Error> {
217+
let db = self.connect()?;
218+
let trevm = trevm::revm::EvmBuilder::default().with_db(db).build_trevm();
219+
Ok(trevm)
220+
}
221+
}
222+
223+
/// A trait for extracting transactions from
224+
pub trait BlockExtractor<Ext, Db: Database + DatabaseCommit>: Send + Sync + 'static {
225+
/// BlockDriver runs the transactions over the provided trevm instance.
226+
type Driver: BlockDriver<Ext, Error<Db>: core::error::Error>;
227+
228+
/// Instantiate an configure a new [`trevm`] instance.
229+
fn trevm(&self, db: Db) -> trevm::EvmNeedsBlock<'static, Ext, Db>;
230+
231+
/// Extracts transactions from the source.
232+
///
233+
/// Extraction is infallible. Worst case it should return a no-op driver.
234+
fn extract(&mut self, bytes: &[u8]) -> Self::Driver;
235+
}
236+
237+
impl<Ext> BlockDriver<Ext> for InProgressBlock {
238+
type Block = NoopBlock;
239+
240+
type Error<Db: Database + DatabaseCommit> = Error<Db>;
241+
242+
fn block(&self) -> &Self::Block {
243+
&NoopBlock
244+
}
245+
246+
/// Loops through the transactions in the block and runs them, accepting the state at the end
247+
/// if it was successful and returning and erroring out otherwise.
248+
fn run_txns<'a, Db: Database + DatabaseCommit>(
249+
&mut self,
250+
mut trevm: trevm::EvmNeedsTx<'a, Ext, Db>,
251+
) -> trevm::RunTxResult<'a, Ext, Db, Self> {
252+
for tx in self.transactions().iter() {
253+
if tx.recover_signer().is_ok() {
254+
let sender = tx.recover_signer().unwrap();
255+
tracing::info!(sender = ?sender, tx_hash = ?tx.tx_hash(), "simulating transaction");
256+
257+
let t = match trevm.run_tx(tx) {
258+
Ok(t) => t,
259+
Err(e) => {
260+
if e.is_transaction_error() {
261+
return Ok(e.discard_error());
262+
} else {
263+
return Err(e.err_into());
264+
}
265+
}
266+
};
267+
268+
(_, trevm) = t.accept();
269+
}
270+
}
271+
Ok(trevm)
272+
}
273+
274+
fn post_block<Db: Database + DatabaseCommit>(
275+
&mut self,
276+
_trevm: &trevm::EvmNeedsBlock<'_, Ext, Db>,
277+
) -> Result<(), Self::Error<Db>> {
278+
Ok(())
279+
}
280+
}
281+
282+
/// Defines the CfgEnv for Pecorino Network
283+
#[derive(Debug, Clone, Copy)]
284+
pub struct PecorinoCfg;
285+
286+
impl Cfg for PecorinoCfg {
287+
fn fill_cfg_env(&self, cfg_env: &mut CfgEnv) {
288+
cfg_env.chain_id = 17003;
289+
}
290+
}
291+
292+
/// Wrap the EVM error in a database error type
293+
pub struct Error<Db: Database>(EVMError<Db::Error>);
294+
295+
impl<Db> From<EVMError<Db::Error>> for Error<Db>
296+
where
297+
Db: Database,
298+
{
299+
fn from(e: EVMError<Db::Error>) -> Self {
300+
Self(e)
301+
}
302+
}
303+
304+
impl<Db: Database> core::error::Error for Error<Db> {}
305+
306+
impl<Db: Database> core::fmt::Debug for Error<Db> {
307+
fn fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result {
308+
write!(f, "Error")
309+
}
310+
}
311+
312+
impl<Db: Database> core::fmt::Display for Error<Db> {
313+
fn fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result {
314+
write!(f, "Error")
315+
}
316+
}

0 commit comments

Comments
 (0)