Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 2 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 3 additions & 0 deletions crates/datastore/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -24,3 +24,6 @@ tracing.workspace = true
[dev-dependencies]
testcontainers.workspace = true
testcontainers-modules.workspace = true
alloy-consensus.workspace = true
alloy-signer-local.workspace = true
op-alloy-network.workspace = true
49 changes: 49 additions & 0 deletions crates/datastore/src/postgres.rs
Original file line number Diff line number Diff line change
Expand Up @@ -271,6 +271,55 @@ impl BundleDatastore for PostgresDatastore {
.map(|h| h.encode_hex_with_prefix())
.collect();

// Check to see if there's an existing bundle with the same bundle hash. We can
// just check if `txs` is the exact same, since it will keccak in the same order.
// In SQL, comparing arrays also compares the order of the elements.
let existing_bundle = sqlx::query_as::<_, BundleRow>(
r#"
SELECT id, senders, minimum_base_fee, txn_hashes, txs, reverting_tx_hashes,
dropping_tx_hashes, block_number, min_timestamp, max_timestamp, bundle_state, state_changed_at
FROM bundles
WHERE txs = $1
LIMIT 1
"#,
)
.bind(&txs)
.fetch_optional(&self.pool)
.await?;

if let Some(existing_bundle) = existing_bundle {
let bundle_id = existing_bundle.id;
let existing_bundle_with_metadata =
self.row_to_bundle_with_metadata(existing_bundle)?;

// make sure bundle hash is the same. the assumption is that since the bundle_hash is the same,
// all fields related to txs remain unchanged (i.e., txs, txn_hashes, reverting_tx_hashes, dropping_tx_hashes)
if existing_bundle_with_metadata.bundle.bundle_hash() == bundle.bundle_hash() {
let (_, min_base_fee, _) = self.extract_bundle_metadata(&bundle)?;

// for now, we naively update with the latest bundle values
sqlx::query!(
r#"
UPDATE bundles
SET bundle_state = $1, minimum_base_fee = $2,
block_number = $3, min_timestamp = $4,
max_timestamp = $5, updated_at = NOW(), state_changed_at = NOW()
WHERE id = $6
"#,
BundleState::Ready as BundleState,
min_base_fee,
bundle.block_number as i64,
bundle.min_timestamp.map(|t| t as i64),
bundle.max_timestamp.map(|t| t as i64),
bundle_id.clone()
)
.execute(&self.pool)
.await?;
}

return Ok(bundle_id);
}

sqlx::query!(
r#"
INSERT INTO bundles (
Expand Down
96 changes: 77 additions & 19 deletions crates/datastore/tests/datastore.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,8 @@
use alloy_primitives::{Address, Bytes, TxHash, address, b256, bytes};
use alloy_consensus::{SignableTransaction, TxEip1559};
use alloy_primitives::{Address, Bytes, TxHash, U256, address, b256, bytes};
use alloy_rpc_types_mev::EthSendBundle;
use alloy_signer_local::PrivateKeySigner;
use op_alloy_network::{TxSignerSync, eip2718::Encodable2718};
use sqlx::PgPool;
use sqlx::types::chrono::Utc;
use testcontainers_modules::{
Expand Down Expand Up @@ -58,9 +61,10 @@ fn create_test_bundle(
block_number: u64,
min_timestamp: Option<u64>,
max_timestamp: Option<u64>,
tx: Bytes,
) -> eyre::Result<EthSendBundle> {
Ok(EthSendBundle {
txs: vec![TX_DATA],
txs: vec![tx],
block_number,
min_timestamp,
max_timestamp,
Expand All @@ -74,6 +78,26 @@ fn create_test_bundle(
})
}

fn generate_tx() -> Bytes {
let signer = PrivateKeySigner::random();
let mut tx = TxEip1559 {
chain_id: 1,
nonce: 0,
gas_limit: 21000,
max_fee_per_gas: 20000000000u128,
max_priority_fee_per_gas: 1000000000u128,
to: Address::ZERO.into(),
value: U256::from(10000000000000u128),
access_list: Default::default(),
input: bytes!("").clone(),
};

let signature = signer.sign_transaction_sync(&mut tx).unwrap();
let encoded_tx = tx.into_signed(signature).encoded_2718();

Bytes::from(encoded_tx)
}

#[tokio::test]
async fn insert_and_get() -> eyre::Result<()> {
let harness = setup_datastore().await?;
Expand Down Expand Up @@ -143,10 +167,10 @@ async fn insert_and_get() -> eyre::Result<()> {
async fn select_bundles_comprehensive() -> eyre::Result<()> {
let harness = setup_datastore().await?;

let bundle1 = create_test_bundle(100, Some(1000), Some(2000))?;
let bundle2 = create_test_bundle(200, Some(1500), Some(2500))?;
let bundle3 = create_test_bundle(300, None, None)?; // valid for all times
let bundle4 = create_test_bundle(0, Some(500), Some(3000))?; // valid for all blocks
let bundle1 = create_test_bundle(100, Some(1000), Some(2000), generate_tx())?;
let bundle2 = create_test_bundle(200, Some(1500), Some(2500), generate_tx())?;
let bundle3 = create_test_bundle(300, None, None, generate_tx())?; // valid for all times
let bundle4 = create_test_bundle(0, Some(500), Some(3000), generate_tx())?; // valid for all blocks

harness
.data_store
Expand Down Expand Up @@ -240,8 +264,8 @@ async fn select_bundles_comprehensive() -> eyre::Result<()> {
async fn cancel_bundle_workflow() -> eyre::Result<()> {
let harness = setup_datastore().await?;

let bundle1 = create_test_bundle(100, Some(1000), Some(2000))?;
let bundle2 = create_test_bundle(200, Some(1500), Some(2500))?;
let bundle1 = create_test_bundle(100, Some(1000), Some(2000), generate_tx())?;
let bundle2 = create_test_bundle(200, Some(1500), Some(2500), generate_tx())?;

let bundle1_id = harness
.data_store
Expand Down Expand Up @@ -337,8 +361,8 @@ async fn find_bundle_by_transaction_hash() -> eyre::Result<()> {
async fn remove_bundles() -> eyre::Result<()> {
let harness = setup_datastore().await?;

let bundle1 = create_test_bundle(100, None, None)?;
let bundle2 = create_test_bundle(200, None, None)?;
let bundle1 = create_test_bundle(100, None, None, generate_tx())?;
let bundle2 = create_test_bundle(200, None, None, generate_tx())?;

let id1 = harness.data_store.insert_bundle(bundle1).await.unwrap();
let id2 = harness.data_store.insert_bundle(bundle2).await.unwrap();
Expand All @@ -363,8 +387,8 @@ async fn remove_bundles() -> eyre::Result<()> {
async fn update_bundles_state() -> eyre::Result<()> {
let harness = setup_datastore().await?;

let bundle1 = create_test_bundle(100, None, None)?;
let bundle2 = create_test_bundle(200, None, None)?;
let bundle1 = create_test_bundle(100, None, None, generate_tx())?;
let bundle2 = create_test_bundle(200, None, None, generate_tx())?;

let id1 = harness.data_store.insert_bundle(bundle1).await.unwrap();
let id2 = harness.data_store.insert_bundle(bundle2).await.unwrap();
Expand Down Expand Up @@ -450,8 +474,8 @@ async fn get_stats() -> eyre::Result<()> {
assert_eq!(stats.total_bundles, 0);
assert_eq!(stats.total_transactions, 0);

let bundle1 = create_test_bundle(100, None, None)?;
let bundle2 = create_test_bundle(200, None, None)?;
let bundle1 = create_test_bundle(100, None, None, generate_tx())?;
let bundle2 = create_test_bundle(200, None, None, generate_tx())?;

let id1 = harness.data_store.insert_bundle(bundle1).await.unwrap();
harness.data_store.insert_bundle(bundle2).await.unwrap();
Expand All @@ -478,9 +502,9 @@ async fn get_stats() -> eyre::Result<()> {
async fn remove_timed_out_bundles() -> eyre::Result<()> {
let harness = setup_datastore().await?;

let expired_bundle = create_test_bundle(100, None, Some(1000))?;
let valid_bundle = create_test_bundle(200, None, Some(2000))?;
let no_timestamp_bundle = create_test_bundle(300, None, None)?;
let expired_bundle = create_test_bundle(100, None, Some(1000), generate_tx())?;
let valid_bundle = create_test_bundle(200, None, Some(2000), generate_tx())?;
let no_timestamp_bundle = create_test_bundle(300, None, None, generate_tx())?;

harness
.data_store
Expand Down Expand Up @@ -519,8 +543,8 @@ async fn remove_timed_out_bundles() -> eyre::Result<()> {
async fn remove_old_included_bundles() -> eyre::Result<()> {
let harness = setup_datastore().await?;

let bundle1 = create_test_bundle(100, None, None)?;
let bundle2 = create_test_bundle(200, None, None)?;
let bundle1 = create_test_bundle(100, None, None, generate_tx())?;
let bundle2 = create_test_bundle(200, None, None, generate_tx())?;

let id1 = harness.data_store.insert_bundle(bundle1).await.unwrap();
let id2 = harness.data_store.insert_bundle(bundle2).await.unwrap();
Expand Down Expand Up @@ -552,3 +576,37 @@ async fn remove_old_included_bundles() -> eyre::Result<()> {

Ok(())
}

#[tokio::test]
async fn insert_bundle_with_same_bundle_hash() -> eyre::Result<()> {
let harness = setup_datastore().await?;

// we use the same tx here to replicate the scenario where two bundles have the same bundle hash
let tx = generate_tx();

let bundle1 = create_test_bundle(100, Some(1000), Some(2000), tx.clone())?;
let id1 = harness.data_store.insert_bundle(bundle1).await.unwrap();
let retrieved_bundle1 = harness.data_store.get_bundle(id1).await.unwrap().unwrap();

let bundle2 = create_test_bundle(100, Some(1500), Some(1800), tx)?;
let id2 = harness.data_store.insert_bundle(bundle2).await.unwrap();
let retrieved_bundle2 = harness.data_store.get_bundle(id2).await.unwrap().unwrap();

// verify the UUID and bundle hash are the same
assert_eq!(id1, id2);
assert_eq!(
retrieved_bundle1.bundle.bundle_hash(),
retrieved_bundle2.bundle.bundle_hash()
);

// verify properties
assert_eq!(retrieved_bundle1.bundle.txs, retrieved_bundle2.bundle.txs);
assert_eq!(
retrieved_bundle1.bundle.block_number,
retrieved_bundle2.bundle.block_number
);
assert_eq!(retrieved_bundle2.bundle.min_timestamp, Some(1500));
assert_eq!(retrieved_bundle2.bundle.max_timestamp, Some(1800));

Ok(())
}