Skip to content

Commit ced6aad

Browse files
committed
Insert/get methods
1 parent bb4fc3e commit ced6aad

File tree

9 files changed

+202
-29
lines changed

9 files changed

+202
-29
lines changed

.env.example

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
DATABASE_URL=postgresql://postgres:postgres@localhost:5432/postgres

.gitignore

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -21,4 +21,4 @@ Thumbs.db
2121

2222
# Backup files
2323
*~
24-
*.bak
24+
*.bak

CLAUDE.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@
33
## Notes
44
- Always run `just ci` before claiming a task is complete and fix any issues
55
- Use `just fix` to fix formatting and warnings
6+
- Only add comments when the implementation logic is unclear, i.e. do not comment insert item into database when the code is db.insert(item)
67
- Always add dependencies to the cargo.toml in the root and reference them in the crate cargo files
78
- Use https://crates.io/ to find dependency versions when adding new deps
89

Cargo.toml

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,13 @@ tracing-subscriber = "0.3.20"
1616
anyhow = "1.0.99"
1717
clap = { version = "4.5.47", features = ["derive", "env"] }
1818
url = "2.5.7"
19-
sqlx = { version = "0.8", features = ["runtime-tokio-rustls", "postgres", "uuid", "chrono", "migrate"] }
19+
sqlx = { version = "0.8.6", features = [
20+
"runtime-tokio-native-tls",
21+
"postgres",
22+
"uuid",
23+
"chrono",
24+
"json",
25+
]}
2026
uuid = { version = "1.18.1", features = ["v4", "serde"] }
2127
serde = { version = "1.0.219", features = ["derive"] }
2228
chrono = { version = "0.4.42", features = ["serde"] }
Lines changed: 9 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,12 +1,18 @@
11
-- Create bundles table
22
CREATE TABLE IF NOT EXISTS bundles (
33
id UUID PRIMARY KEY,
4+
5+
senders CHAR(42)[],
6+
minimum_base_fee BIGINT, -- todo validate it's large enough
7+
txn_hashes CHAR(66)[],
8+
49
txs TEXT[] NOT NULL,
5-
block_number BIGINT NOT NULL,
10+
reverting_tx_hashes CHAR(66)[],
11+
dropping_tx_hashes CHAR(66)[],
12+
13+
block_number BIGINT,
614
min_timestamp BIGINT,
715
max_timestamp BIGINT,
8-
reverting_tx_hashes TEXT[],
9-
replacement_uuid TEXT,
1016
created_at TIMESTAMPTZ NOT NULL,
1117
updated_at TIMESTAMPTZ NOT NULL
1218
);

crates/datastore/src/postgres.rs

Lines changed: 90 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,6 @@
11
use crate::traits::BundleDatastore;
2+
use alloy_primitives::TxHash;
3+
use alloy_primitives::hex::{FromHex, ToHexExt};
24
use alloy_rpc_types_mev::EthSendBundle;
35
use anyhow::Result;
46
use sqlx::PgPool;
@@ -28,12 +30,96 @@ impl PostgresDatastore {
2830

2931
#[async_trait::async_trait]
3032
impl BundleDatastore for PostgresDatastore {
31-
async fn insert_bundle(&self, _bundle: EthSendBundle) -> Result<Uuid> {
32-
todo!()
33+
async fn insert_bundle(&self, bundle: EthSendBundle) -> Result<Uuid> {
34+
let id = Uuid::new_v4();
35+
36+
let txs: Vec<String> = bundle
37+
.txs
38+
.iter()
39+
.map(|tx| tx.encode_hex_upper_with_prefix())
40+
.collect();
41+
let reverting_tx_hashes: Vec<String> = bundle
42+
.reverting_tx_hashes
43+
.iter()
44+
.map(|h| h.encode_hex_with_prefix())
45+
.collect();
46+
let dropping_tx_hashes: Vec<String> = bundle
47+
.dropping_tx_hashes
48+
.iter()
49+
.map(|h| h.encode_hex_with_prefix())
50+
.collect();
51+
52+
sqlx::query!(
53+
r#"
54+
INSERT INTO bundles (
55+
id, txs, reverting_tx_hashes, dropping_tx_hashes,
56+
block_number, min_timestamp, max_timestamp,
57+
created_at, updated_at
58+
)
59+
VALUES ($1, $2, $3, $4, $5, $6, $7, NOW(), NOW())
60+
"#,
61+
id,
62+
&txs,
63+
&reverting_tx_hashes,
64+
&dropping_tx_hashes,
65+
bundle.block_number as i64,
66+
bundle.min_timestamp.map(|t| t as i64),
67+
bundle.max_timestamp.map(|t| t as i64),
68+
)
69+
.execute(&self.pool)
70+
.await?;
71+
72+
Ok(id)
3373
}
3474

35-
async fn get_bundle(&self, _id: Uuid) -> Result<Option<EthSendBundle>> {
36-
todo!()
75+
async fn get_bundle(&self, id: Uuid) -> Result<Option<EthSendBundle>> {
76+
let result = sqlx::query!(
77+
r#"
78+
SELECT txs, reverting_tx_hashes, dropping_tx_hashes,
79+
block_number, min_timestamp, max_timestamp
80+
FROM bundles
81+
WHERE id = $1
82+
"#,
83+
id
84+
)
85+
.fetch_optional(&self.pool)
86+
.await?;
87+
88+
match result {
89+
Some(row) => {
90+
let txs: Result<Vec<alloy_primitives::Bytes>, _> =
91+
row.txs.into_iter().map(|tx_hex| tx_hex.parse()).collect();
92+
93+
let reverting_tx_hashes: Result<Vec<TxHash>, _> = row
94+
.reverting_tx_hashes
95+
.unwrap_or_default()
96+
.into_iter()
97+
.map(TxHash::from_hex)
98+
.collect();
99+
100+
let dropping_tx_hashes: Result<Vec<TxHash>, _> = row
101+
.dropping_tx_hashes
102+
.unwrap_or_default()
103+
.into_iter()
104+
.map(TxHash::from_hex)
105+
.collect();
106+
107+
Ok(Some(EthSendBundle {
108+
txs: txs?,
109+
block_number: row.block_number.unwrap_or(0) as u64,
110+
min_timestamp: row.min_timestamp.map(|t| t as u64),
111+
max_timestamp: row.max_timestamp.map(|t| t as u64),
112+
reverting_tx_hashes: reverting_tx_hashes?,
113+
replacement_uuid: None,
114+
dropping_tx_hashes: dropping_tx_hashes?,
115+
refund_percent: None,
116+
refund_recipient: None,
117+
refund_tx_hashes: Vec::new(),
118+
extra_fields: Default::default(),
119+
}))
120+
}
121+
None => Ok(None),
122+
}
37123
}
38124

39125
async fn cancel_bundle(&self, _id: Uuid) -> Result<()> {
Lines changed: 80 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,80 @@
1+
use alloy_primitives::{Bytes, TxHash};
2+
use alloy_rpc_types_mev::EthSendBundle;
3+
use datastore::{PostgresDatastore, traits::BundleDatastore};
4+
use sqlx::PgPool;
5+
use testcontainers_modules::{
6+
postgres,
7+
testcontainers::{ContainerAsync, runners::AsyncRunner},
8+
};
9+
10+
struct TestHarness {
11+
_postgres_instance: ContainerAsync<postgres::Postgres>,
12+
data_store: PostgresDatastore,
13+
}
14+
15+
async fn setup_datastore() -> eyre::Result<TestHarness> {
16+
let postgres_instance = postgres::Postgres::default().start().await?;
17+
let connection_string = format!(
18+
"postgres://postgres:postgres@{}:{}/postgres",
19+
postgres_instance.get_host().await?,
20+
postgres_instance.get_host_port_ipv4(5432).await?
21+
);
22+
23+
let pool = PgPool::connect(&connection_string).await?;
24+
let data_store = PostgresDatastore::new(pool);
25+
26+
assert!(data_store.run_migrations().await.is_ok());
27+
Ok(TestHarness {
28+
_postgres_instance: postgres_instance,
29+
data_store,
30+
})
31+
}
32+
33+
#[tokio::test]
34+
async fn insert_and_get() -> eyre::Result<()> {
35+
let harness = setup_datastore().await?;
36+
let test_bundle = EthSendBundle {
37+
txs: vec![
38+
"0x02f86f0102843b9aca0085029e7822d68298f094d2c8e0b2e8f2a8e8f2a8e8f2a8e8f2a8e8f2a880b844a9059cbb000000000000000000000000d2c8e0b2e8f2a8e8f2a8e8f2a8e8f2a8e8f2a80000000000000000000000000000000000000000000000000de0b6b3a7640000c080a0123456789abcdef0123456789abcdef0123456789abcdef0123456789abcdef0a0fedcba9876543210fedcba9876543210fedcba9876543210fedcba9876543210".parse::<Bytes>()?,
39+
],
40+
block_number: 12345,
41+
min_timestamp: Some(1640995200),
42+
max_timestamp: Some(1640995260),
43+
reverting_tx_hashes: vec![
44+
"0x1234567890abcdef1234567890abcdef1234567890abcdef1234567890abcdef".parse::<TxHash>()?,
45+
],
46+
replacement_uuid: None,
47+
dropping_tx_hashes: vec![
48+
"0xfedcba0987654321fedcba0987654321fedcba0987654321fedcba0987654321".parse::<TxHash>()?,
49+
],
50+
refund_percent: None,
51+
refund_recipient: None,
52+
refund_tx_hashes: vec![],
53+
extra_fields: Default::default(),
54+
};
55+
56+
let insert_result = harness.data_store.insert_bundle(test_bundle.clone()).await;
57+
assert!(insert_result.is_ok());
58+
let bundle_id = insert_result.unwrap();
59+
60+
let query_result = harness.data_store.get_bundle(bundle_id).await;
61+
assert!(query_result.is_ok());
62+
let retrieved_bundle = query_result.unwrap();
63+
64+
assert!(retrieved_bundle.is_some(), "Bundle should be found");
65+
let retrieved_bundle = retrieved_bundle.unwrap();
66+
assert_eq!(retrieved_bundle.txs.len(), test_bundle.txs.len());
67+
assert_eq!(retrieved_bundle.block_number, test_bundle.block_number);
68+
assert_eq!(retrieved_bundle.min_timestamp, test_bundle.min_timestamp);
69+
assert_eq!(retrieved_bundle.max_timestamp, test_bundle.max_timestamp);
70+
assert_eq!(
71+
retrieved_bundle.reverting_tx_hashes.len(),
72+
test_bundle.reverting_tx_hashes.len()
73+
);
74+
assert_eq!(
75+
retrieved_bundle.dropping_tx_hashes.len(),
76+
test_bundle.dropping_tx_hashes.len()
77+
);
78+
79+
Ok(())
80+
}

crates/datastore/tests/integration_tests.rs

Lines changed: 0 additions & 20 deletions
This file was deleted.

justfile

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,19 @@
11
# Run all CI checks locally
22
ci: check test fmt clippy build
33

4+
db:
5+
#!/usr/bin/env bash
6+
set -euxo pipefail
7+
docker container stop tips-db
8+
docker container rm tips-db
9+
docker run -d --name tips-db -e POSTGRES_PASSWORD=postgres -p 5432:5432 postgres
10+
sleep 2
11+
for file in ./crates/datastore/migrations/*.sql; do
12+
echo $file
13+
psql -d postgres://postgres:postgres@localhost:5432/postgres -f $file
14+
done
15+
16+
417
create-migration name:
518
touch crates/datastore/migrations/$(date +%s)_{{ name }}.sql
619

0 commit comments

Comments
 (0)