Skip to content

Commit 489fa99

Browse files
committed
Select bundles
1 parent 6d3d807 commit 489fa99

File tree

3 files changed

+290
-58
lines changed

3 files changed

+290
-58
lines changed

crates/datastore/src/postgres.rs

Lines changed: 137 additions & 55 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,48 @@ use sqlx::PgPool;
1111
use tracing::info;
1212
use uuid::Uuid;
1313

14+
#[derive(sqlx::FromRow, Debug)]
15+
struct BundleRow {
16+
senders: Option<Vec<String>>,
17+
minimum_base_fee: Option<i64>,
18+
txn_hashes: Option<Vec<String>>,
19+
txs: Vec<String>,
20+
reverting_tx_hashes: Option<Vec<String>>,
21+
dropping_tx_hashes: Option<Vec<String>>,
22+
block_number: Option<i64>,
23+
min_timestamp: Option<i64>,
24+
max_timestamp: Option<i64>,
25+
}
26+
27+
/// Filter criteria for selecting bundles
28+
#[derive(Debug, Clone, Default)]
29+
pub struct BundleFilter {
30+
pub base_fee: Option<i64>,
31+
pub block_number: Option<u64>,
32+
pub timestamp: Option<u64>,
33+
}
34+
35+
impl BundleFilter {
36+
pub fn new() -> Self {
37+
Self::default()
38+
}
39+
40+
pub fn with_base_fee(mut self, base_fee: i64) -> Self {
41+
self.base_fee = Some(base_fee);
42+
self
43+
}
44+
45+
pub fn valid_for_block(mut self, block_number: u64) -> Self {
46+
self.block_number = Some(block_number);
47+
self
48+
}
49+
50+
pub fn valid_for_timestamp(mut self, timestamp: u64) -> Self {
51+
self.timestamp = Some(timestamp);
52+
self
53+
}
54+
}
55+
1456
/// Extended bundle data that includes the original bundle plus extracted metadata
1557
#[derive(Debug, Clone)]
1658
pub struct BundleWithMetadata {
@@ -42,6 +84,60 @@ impl PostgresDatastore {
4284
}
4385

4486
impl PostgresDatastore {
87+
fn row_to_bundle_with_metadata(&self, row: BundleRow) -> Result<BundleWithMetadata> {
88+
let parsed_txs: Result<Vec<alloy_primitives::Bytes>, _> =
89+
row.txs.into_iter().map(|tx_hex| tx_hex.parse()).collect();
90+
91+
let parsed_reverting_tx_hashes: Result<Vec<TxHash>, _> = row
92+
.reverting_tx_hashes
93+
.unwrap_or_default()
94+
.into_iter()
95+
.map(TxHash::from_hex)
96+
.collect();
97+
98+
let parsed_dropping_tx_hashes: Result<Vec<TxHash>, _> = row
99+
.dropping_tx_hashes
100+
.unwrap_or_default()
101+
.into_iter()
102+
.map(TxHash::from_hex)
103+
.collect();
104+
105+
let bundle = EthSendBundle {
106+
txs: parsed_txs?,
107+
block_number: row.block_number.unwrap_or(0) as u64,
108+
min_timestamp: row.min_timestamp.map(|t| t as u64),
109+
max_timestamp: row.max_timestamp.map(|t| t as u64),
110+
reverting_tx_hashes: parsed_reverting_tx_hashes?,
111+
replacement_uuid: None,
112+
dropping_tx_hashes: parsed_dropping_tx_hashes?,
113+
refund_percent: None,
114+
refund_recipient: None,
115+
refund_tx_hashes: Vec::new(),
116+
extra_fields: Default::default(),
117+
};
118+
119+
let parsed_txn_hashes: Result<Vec<TxHash>, _> = row
120+
.txn_hashes
121+
.unwrap_or_default()
122+
.into_iter()
123+
.map(TxHash::from_hex)
124+
.collect();
125+
126+
let parsed_senders: Result<Vec<Address>, _> = row
127+
.senders
128+
.unwrap_or_default()
129+
.into_iter()
130+
.map(Address::from_hex)
131+
.collect();
132+
133+
Ok(BundleWithMetadata {
134+
bundle,
135+
txn_hashes: parsed_txn_hashes?,
136+
senders: parsed_senders?,
137+
min_base_fee: row.minimum_base_fee.unwrap_or(0),
138+
})
139+
}
140+
45141
fn extract_bundle_metadata(
46142
&self,
47143
bundle: &EthSendBundle,
@@ -125,71 +221,22 @@ impl BundleDatastore for PostgresDatastore {
125221
}
126222

127223
async fn get_bundle(&self, id: Uuid) -> Result<Option<BundleWithMetadata>> {
128-
let result = sqlx::query!(
224+
let result = sqlx::query_as::<_, BundleRow>(
129225
r#"
130226
SELECT senders, minimum_base_fee, txn_hashes, txs, reverting_tx_hashes,
131227
dropping_tx_hashes, block_number, min_timestamp, max_timestamp
132228
FROM bundles
133229
WHERE id = $1
134230
"#,
135-
id
136231
)
232+
.bind(id)
137233
.fetch_optional(&self.pool)
138234
.await?;
139235

140236
match result {
141237
Some(row) => {
142-
let txs: Result<Vec<alloy_primitives::Bytes>, _> =
143-
row.txs.into_iter().map(|tx_hex| tx_hex.parse()).collect();
144-
145-
let reverting_tx_hashes: Result<Vec<TxHash>, _> = row
146-
.reverting_tx_hashes
147-
.unwrap_or_default()
148-
.into_iter()
149-
.map(TxHash::from_hex)
150-
.collect();
151-
152-
let dropping_tx_hashes: Result<Vec<TxHash>, _> = row
153-
.dropping_tx_hashes
154-
.unwrap_or_default()
155-
.into_iter()
156-
.map(TxHash::from_hex)
157-
.collect();
158-
159-
let bundle = EthSendBundle {
160-
txs: txs?,
161-
block_number: row.block_number.unwrap_or(0) as u64,
162-
min_timestamp: row.min_timestamp.map(|t| t as u64),
163-
max_timestamp: row.max_timestamp.map(|t| t as u64),
164-
reverting_tx_hashes: reverting_tx_hashes?,
165-
replacement_uuid: None,
166-
dropping_tx_hashes: dropping_tx_hashes?,
167-
refund_percent: None,
168-
refund_recipient: None,
169-
refund_tx_hashes: Vec::new(),
170-
extra_fields: Default::default(),
171-
};
172-
173-
let txn_hashes: Result<Vec<TxHash>, _> = row
174-
.txn_hashes
175-
.unwrap_or_default()
176-
.into_iter()
177-
.map(TxHash::from_hex)
178-
.collect();
179-
180-
let senders: Result<Vec<Address>, _> = row
181-
.senders
182-
.unwrap_or_default()
183-
.into_iter()
184-
.map(Address::from_hex)
185-
.collect();
186-
187-
Ok(Some(BundleWithMetadata {
188-
bundle,
189-
txn_hashes: txn_hashes?,
190-
senders: senders?,
191-
min_base_fee: row.minimum_base_fee.unwrap_or(0),
192-
}))
238+
let bundle_with_metadata = self.row_to_bundle_with_metadata(row)?;
239+
Ok(Some(bundle_with_metadata))
193240
}
194241
None => Ok(None),
195242
}
@@ -199,7 +246,42 @@ impl BundleDatastore for PostgresDatastore {
199246
todo!()
200247
}
201248

202-
async fn select_bundles(&self) -> Result<Vec<EthSendBundle>> {
203-
todo!()
249+
async fn select_bundles(&self, filter: BundleFilter) -> Result<Vec<BundleWithMetadata>> {
250+
let base_fee = filter.base_fee.unwrap_or(0);
251+
let block_number = filter.block_number.unwrap_or(0) as i64;
252+
253+
let (min_ts, max_ts) = if let Some(timestamp) = filter.timestamp {
254+
(timestamp as i64, timestamp as i64)
255+
} else {
256+
// If not specified, set the parameters to be the whole range
257+
(i64::MAX, 0i64)
258+
};
259+
260+
let rows = sqlx::query_as::<_, BundleRow>(
261+
r#"
262+
SELECT senders, minimum_base_fee, txn_hashes, txs, reverting_tx_hashes,
263+
dropping_tx_hashes, block_number, min_timestamp, max_timestamp
264+
FROM bundles
265+
WHERE minimum_base_fee >= $1
266+
AND (block_number = $2 OR block_number IS NULL OR block_number = 0 OR $2 = 0)
267+
AND (min_timestamp <= $3 OR min_timestamp IS NULL)
268+
AND (max_timestamp >= $4 OR max_timestamp IS NULL)
269+
ORDER BY minimum_base_fee DESC
270+
"#,
271+
)
272+
.bind(base_fee)
273+
.bind(block_number)
274+
.bind(min_ts)
275+
.bind(max_ts)
276+
.fetch_all(&self.pool)
277+
.await?;
278+
279+
let mut bundles = Vec::new();
280+
for row in rows {
281+
let bundle_with_metadata = self.row_to_bundle_with_metadata(row)?;
282+
bundles.push(bundle_with_metadata);
283+
}
284+
285+
Ok(bundles)
204286
}
205287
}

crates/datastore/src/traits.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
use crate::postgres::BundleWithMetadata;
1+
use crate::postgres::{BundleFilter, BundleWithMetadata};
22
use alloy_rpc_types_mev::EthSendBundle;
33
use anyhow::Result;
44
use uuid::Uuid;
@@ -16,5 +16,5 @@ pub trait BundleDatastore: Send + Sync {
1616
async fn cancel_bundle(&self, id: Uuid) -> Result<()>;
1717

1818
/// Select the candidate bundles to include in the next Flashblock
19-
async fn select_bundles(&self) -> Result<Vec<EthSendBundle>>;
19+
async fn select_bundles(&self, filter: BundleFilter) -> Result<Vec<BundleWithMetadata>>;
2020
}

0 commit comments

Comments
 (0)