Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
22 changes: 18 additions & 4 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 3 additions & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -7,14 +7,15 @@ homepage = "https://github.com/base/tips"
repository = "https://github.com/base/tips"

[workspace]
members = ["crates/datastore", "crates/audit", "crates/ingress-rpc", "crates/maintenance", "crates/ingress-writer"]
members = ["crates/datastore", "crates/audit", "crates/ingress-rpc", "crates/maintenance", "crates/ingress-writer", "crates/common"]
resolver = "2"

[workspace.dependencies]
tips-datastore = { path = "crates/datastore" }
tips-audit = { path = "crates/audit" }
tips-maintenance = { path = "crates/maintenance" }
tips-ingress-writer = { path = "crates/ingress-writer" }
tips-common = { path = "crates/common" }

# Reth
reth = { git = "https://github.com/paradigmxyz/reth", tag = "v1.8.1" }
Expand Down Expand Up @@ -59,6 +60,7 @@ dotenvy = "0.15.7"
testcontainers = { version = "0.23.1", features = ["blocking"] }
testcontainers-modules = { version = "0.11.2", features = ["postgres", "kafka", "minio"] }
jsonrpsee = { version = "0.26.0", features = ["server", "macros"] }
chrono = { version = "0.4.42", features = ["serde"] }

# Kafka and S3 dependencies
rdkafka = { version = "0.37.0", features = ["libz-static", "ssl-vendored"] }
Expand Down
19 changes: 19 additions & 0 deletions crates/common/Cargo.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
[package]
name = "tips-common"
version.workspace = true
edition.workspace = true
rust-version.workspace = true
license.workspace = true
homepage.workspace = true
repository.workspace = true

[dependencies]
alloy-rpc-types-mev.workspace = true
alloy-primitives.workspace = true
sqlx.workspace = true
anyhow.workspace = true
op-alloy-consensus.workspace = true
alloy-provider.workspace = true
alloy-consensus.workspace = true
serde.workspace = true
chrono.workspace = true
64 changes: 64 additions & 0 deletions crates/common/src/lib.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
use alloy_consensus::Transaction;
use alloy_consensus::transaction::SignerRecoverable;
use alloy_primitives::{Address, TxHash};
use alloy_provider::network::eip2718::Decodable2718;
use alloy_rpc_types_mev::EthSendBundle;
use anyhow::Result;
use chrono::{DateTime, Utc};
use op_alloy_consensus::OpTxEnvelope;
use serde::{Deserialize, Serialize};

#[derive(Debug, Clone, sqlx::Type, Serialize, Deserialize)]
#[sqlx(type_name = "bundle_state", rename_all = "PascalCase")]
pub enum BundleState {
Ready,
IncludedByBuilder,
}

/// Extended bundle data that includes the original bundle plus extracted metadata
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct BundleWithMetadata {
pub bundle: EthSendBundle,
pub txn_hashes: Vec<TxHash>,
pub senders: Vec<Address>,
pub min_base_fee: i64,
pub state: BundleState,
pub state_changed_at: DateTime<Utc>,
}

impl BundleWithMetadata {
pub fn new(bundle: &EthSendBundle) -> Result<Self> {
let mut senders = Vec::new();
let mut txn_hashes = Vec::new();

let mut min_base_fee = i64::MAX;

for tx_bytes in &bundle.txs {
let envelope = OpTxEnvelope::decode_2718_exact(tx_bytes)?;
txn_hashes.push(*envelope.hash());

let sender = match envelope.recover_signer() {
Ok(signer) => signer,
Err(err) => return Err(err.into()),
};

senders.push(sender);
min_base_fee = min_base_fee.min(envelope.max_fee_per_gas() as i64); // todo type and todo not right
}

let minimum_base_fee = if min_base_fee == i64::MAX {
0
} else {
min_base_fee
};

Ok(Self {
bundle: bundle.clone(),
txn_hashes,
senders,
min_base_fee: minimum_base_fee,
state: BundleState::Ready,
state_changed_at: Utc::now(),
})
}
}
5 changes: 1 addition & 4 deletions crates/datastore/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -15,11 +15,8 @@ anyhow.workspace = true
async-trait.workspace = true
alloy-rpc-types-mev.workspace = true
alloy-primitives.workspace = true
alloy-consensus.workspace = true
alloy-provider.workspace = true
op-alloy-consensus.workspace = true
eyre.workspace = true
tracing.workspace = true
tips-common.workspace = true

[dev-dependencies]
testcontainers.workspace = true
Expand Down
77 changes: 19 additions & 58 deletions crates/datastore/src/postgres.rs
Original file line number Diff line number Diff line change
@@ -1,25 +1,16 @@
use crate::traits::BundleDatastore;
use alloy_consensus::Transaction;
use alloy_consensus::transaction::SignerRecoverable;
use alloy_primitives::hex::{FromHex, ToHexExt};
use alloy_primitives::{Address, B256, TxHash};
use alloy_provider::network::eip2718::Decodable2718;
use alloy_rpc_types_mev::EthSendBundle;
use anyhow::Result;
use op_alloy_consensus::OpTxEnvelope;
use sqlx::{
PgPool,
types::chrono::{DateTime, Utc},
};
use tracing::info;
use uuid::Uuid;

#[derive(Debug, Clone, sqlx::Type)]
#[sqlx(type_name = "bundle_state", rename_all = "PascalCase")]
pub enum BundleState {
Ready,
IncludedByBuilder,
}
use tips_common::{BundleState, BundleWithMetadata};

#[derive(sqlx::FromRow, Debug)]
struct BundleRow {
Expand Down Expand Up @@ -85,17 +76,6 @@ impl BundleFilter {
}
}

/// Extended bundle data that includes the original bundle plus extracted metadata
#[derive(Debug, Clone)]
pub struct BundleWithMetadata {
pub bundle: EthSendBundle,
pub txn_hashes: Vec<TxHash>,
pub senders: Vec<Address>,
pub min_base_fee: i64,
pub state: BundleState,
pub state_changed_at: DateTime<Utc>,
}

/// Statistics about bundles and transactions grouped by state
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct BundleStats {
Expand Down Expand Up @@ -215,57 +195,38 @@ impl PostgresDatastore {
state_changed_at: row.state_changed_at,
})
}

fn extract_bundle_metadata(
&self,
bundle: &EthSendBundle,
) -> Result<(Vec<String>, i64, Vec<String>)> {
let mut senders = Vec::new();
let mut txn_hashes = Vec::new();

let mut min_base_fee = i64::MAX;

for tx_bytes in &bundle.txs {
let envelope = OpTxEnvelope::decode_2718_exact(tx_bytes)?;
txn_hashes.push(envelope.hash().encode_hex_with_prefix());

let sender = match envelope.recover_signer() {
Ok(signer) => signer,
Err(err) => return Err(err.into()),
};

senders.push(sender.encode_hex_with_prefix());
min_base_fee = min_base_fee.min(envelope.max_fee_per_gas() as i64); // todo type and todo not right
}

let minimum_base_fee = if min_base_fee == i64::MAX {
0
} else {
min_base_fee
};

Ok((senders, minimum_base_fee, txn_hashes))
}
}

#[async_trait::async_trait]
impl BundleDatastore for PostgresDatastore {
async fn insert_bundle(&self, bundle: EthSendBundle) -> Result<Uuid> {
async fn insert_bundle(&self, bundle: BundleWithMetadata) -> Result<Uuid> {
let id = Uuid::new_v4();

let (senders, minimum_base_fee, txn_hashes) = self.extract_bundle_metadata(&bundle)?;
let senders: Vec<String> = bundle
.senders
.iter()
.map(|s| s.encode_hex_with_prefix())
.collect();
let txn_hashes: Vec<String> = bundle
.txn_hashes
.iter()
.map(|h| h.encode_hex_with_prefix())
.collect();

let txs: Vec<String> = bundle
.bundle
.txs
.iter()
.map(|tx| tx.encode_hex_upper_with_prefix())
.collect();
let reverting_tx_hashes: Vec<String> = bundle
.bundle
.reverting_tx_hashes
.iter()
.map(|h| h.encode_hex_with_prefix())
.collect();
let dropping_tx_hashes: Vec<String> = bundle
.bundle
.dropping_tx_hashes
.iter()
.map(|h| h.encode_hex_with_prefix())
Expand All @@ -284,14 +245,14 @@ impl BundleDatastore for PostgresDatastore {
id,
BundleState::Ready as BundleState,
&senders,
minimum_base_fee,
bundle.min_base_fee,
&txn_hashes,
&txs,
&reverting_tx_hashes,
&dropping_tx_hashes,
bundle.block_number as i64,
bundle.min_timestamp.map(|t| t as i64),
bundle.max_timestamp.map(|t| t as i64),
bundle.bundle.block_number as i64,
bundle.bundle.min_timestamp.map(|t| t as i64),
bundle.bundle.max_timestamp.map(|t| t as i64),
)
.execute(&self.pool)
.await?;
Expand Down
9 changes: 4 additions & 5 deletions crates/datastore/src/traits.rs
Original file line number Diff line number Diff line change
@@ -1,17 +1,16 @@
use crate::postgres::{
BlockInfo, BlockInfoUpdate, BundleFilter, BundleState, BundleStats, BundleWithMetadata,
};
use crate::postgres::{BlockInfo, BlockInfoUpdate, BundleFilter, BundleStats};
use alloy_primitives::TxHash;
use alloy_rpc_types_mev::EthSendBundle;
use anyhow::Result;
use sqlx::types::chrono::{DateTime, Utc};
use uuid::Uuid;

use tips_common::{BundleState, BundleWithMetadata};

/// Trait defining the interface for bundle datastore operations
#[async_trait::async_trait]
pub trait BundleDatastore: Send + Sync {
/// Insert a new bundle into the datastore
async fn insert_bundle(&self, bundle: EthSendBundle) -> Result<Uuid>;
async fn insert_bundle(&self, bundle: BundleWithMetadata) -> Result<Uuid>;

/// Fetch a bundle with metadata by its ID
async fn get_bundle(&self, id: Uuid) -> Result<Option<BundleWithMetadata>>;
Expand Down
Loading