diff --git a/Cargo.lock b/Cargo.lock index 4d6f59f5..bd70e207 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -12492,7 +12492,7 @@ dependencies = [ ] [[package]] -name = "tips-datastore" +name = "tips-common" version = "0.1.0" dependencies = [ "alloy-consensus", @@ -12500,12 +12500,24 @@ dependencies = [ "alloy-provider", "alloy-rpc-types-mev", "anyhow", - "async-trait", - "eyre", + "chrono", "op-alloy-consensus", + "serde", + "sqlx", +] + +[[package]] +name = "tips-datastore" +version = "0.1.0" +dependencies = [ + "alloy-primitives", + "alloy-rpc-types-mev", + "anyhow", + "async-trait", "sqlx", "testcontainers", "testcontainers-modules", + "tips-common", "tokio", "tracing", "uuid", @@ -12534,6 +12546,7 @@ dependencies = [ "reth-rpc-eth-types", "revm-context-interface", "serde_json", + "tips-common", "tokio", "tracing", "tracing-subscriber 0.3.20", @@ -12544,7 +12557,6 @@ dependencies = [ name = "tips-ingress-writer" version = "0.1.0" dependencies = [ - "alloy-rpc-types-mev", "anyhow", "backon", "clap", @@ -12552,6 +12564,7 @@ dependencies = [ "rdkafka", "serde_json", "tips-audit", + "tips-common", "tips-datastore", "tokio", "tracing", @@ -12577,6 +12590,7 @@ dependencies = [ "rdkafka", "sqlx", "tips-audit", + "tips-common", "tips-datastore", "tokio", "tracing", diff --git a/Cargo.toml b/Cargo.toml index 9114e9f3..fd631ba9 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -7,7 +7,7 @@ homepage = "https://github.com/base/tips" repository = "https://github.com/base/tips" [workspace] -members = ["crates/datastore", "crates/audit", "crates/ingress-rpc", "crates/maintenance", "crates/ingress-writer"] +members = ["crates/datastore", "crates/audit", "crates/ingress-rpc", "crates/maintenance", "crates/ingress-writer", "crates/common"] resolver = "2" [workspace.dependencies] @@ -15,6 +15,7 @@ tips-datastore = { path = "crates/datastore" } tips-audit = { path = "crates/audit" } tips-maintenance = { path = "crates/maintenance" } tips-ingress-writer = { path = "crates/ingress-writer" } +tips-common = { path = "crates/common" } # Reth reth = { git = "https://github.com/paradigmxyz/reth", tag = "v1.8.1" } @@ -59,6 +60,7 @@ dotenvy = "0.15.7" testcontainers = { version = "0.23.1", features = ["blocking"] } testcontainers-modules = { version = "0.11.2", features = ["postgres", "kafka", "minio"] } jsonrpsee = { version = "0.26.0", features = ["server", "macros"] } +chrono = { version = "0.4.42", features = ["serde"] } # Kafka and S3 dependencies rdkafka = { version = "0.37.0", features = ["libz-static", "ssl-vendored"] } diff --git a/crates/common/Cargo.toml b/crates/common/Cargo.toml new file mode 100644 index 00000000..c9b76c4c --- /dev/null +++ b/crates/common/Cargo.toml @@ -0,0 +1,19 @@ +[package] +name = "tips-common" +version.workspace = true +edition.workspace = true +rust-version.workspace = true +license.workspace = true +homepage.workspace = true +repository.workspace = true + +[dependencies] +alloy-rpc-types-mev.workspace = true +alloy-primitives.workspace = true +sqlx.workspace = true +anyhow.workspace = true +op-alloy-consensus.workspace = true +alloy-provider.workspace = true +alloy-consensus.workspace = true +serde.workspace = true +chrono.workspace = true diff --git a/crates/common/src/lib.rs b/crates/common/src/lib.rs new file mode 100644 index 00000000..fc3616e3 --- /dev/null +++ b/crates/common/src/lib.rs @@ -0,0 +1,64 @@ +use alloy_consensus::Transaction; +use alloy_consensus::transaction::SignerRecoverable; +use alloy_primitives::{Address, TxHash}; +use alloy_provider::network::eip2718::Decodable2718; +use alloy_rpc_types_mev::EthSendBundle; +use anyhow::Result; +use chrono::{DateTime, Utc}; +use op_alloy_consensus::OpTxEnvelope; +use serde::{Deserialize, Serialize}; + +#[derive(Debug, Clone, sqlx::Type, Serialize, Deserialize)] +#[sqlx(type_name = "bundle_state", rename_all = "PascalCase")] +pub enum BundleState { + Ready, + IncludedByBuilder, +} + +/// Extended bundle data that includes the original bundle plus extracted metadata +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct BundleWithMetadata { + pub bundle: EthSendBundle, + pub txn_hashes: Vec, + pub senders: Vec
, + pub min_base_fee: i64, + pub state: BundleState, + pub state_changed_at: DateTime, +} + +impl BundleWithMetadata { + pub fn new(bundle: &EthSendBundle) -> Result { + let mut senders = Vec::new(); + let mut txn_hashes = Vec::new(); + + let mut min_base_fee = i64::MAX; + + for tx_bytes in &bundle.txs { + let envelope = OpTxEnvelope::decode_2718_exact(tx_bytes)?; + txn_hashes.push(*envelope.hash()); + + let sender = match envelope.recover_signer() { + Ok(signer) => signer, + Err(err) => return Err(err.into()), + }; + + senders.push(sender); + min_base_fee = min_base_fee.min(envelope.max_fee_per_gas() as i64); // todo type and todo not right + } + + let minimum_base_fee = if min_base_fee == i64::MAX { + 0 + } else { + min_base_fee + }; + + Ok(Self { + bundle: bundle.clone(), + txn_hashes, + senders, + min_base_fee: minimum_base_fee, + state: BundleState::Ready, + state_changed_at: Utc::now(), + }) + } +} diff --git a/crates/datastore/Cargo.toml b/crates/datastore/Cargo.toml index ca6fd4a8..4687a3fe 100644 --- a/crates/datastore/Cargo.toml +++ b/crates/datastore/Cargo.toml @@ -15,11 +15,8 @@ anyhow.workspace = true async-trait.workspace = true alloy-rpc-types-mev.workspace = true alloy-primitives.workspace = true -alloy-consensus.workspace = true -alloy-provider.workspace = true -op-alloy-consensus.workspace = true -eyre.workspace = true tracing.workspace = true +tips-common.workspace = true [dev-dependencies] testcontainers.workspace = true diff --git a/crates/datastore/src/postgres.rs b/crates/datastore/src/postgres.rs index 92508012..700dbcb4 100644 --- a/crates/datastore/src/postgres.rs +++ b/crates/datastore/src/postgres.rs @@ -1,12 +1,8 @@ use crate::traits::BundleDatastore; -use alloy_consensus::Transaction; -use alloy_consensus::transaction::SignerRecoverable; use alloy_primitives::hex::{FromHex, ToHexExt}; use alloy_primitives::{Address, B256, TxHash}; -use alloy_provider::network::eip2718::Decodable2718; use alloy_rpc_types_mev::EthSendBundle; use anyhow::Result; -use op_alloy_consensus::OpTxEnvelope; use sqlx::{ PgPool, types::chrono::{DateTime, Utc}, @@ -14,12 +10,7 @@ use sqlx::{ use tracing::info; use uuid::Uuid; -#[derive(Debug, Clone, sqlx::Type)] -#[sqlx(type_name = "bundle_state", rename_all = "PascalCase")] -pub enum BundleState { - Ready, - IncludedByBuilder, -} +use tips_common::{BundleState, BundleWithMetadata}; #[derive(sqlx::FromRow, Debug)] struct BundleRow { @@ -85,17 +76,6 @@ impl BundleFilter { } } -/// Extended bundle data that includes the original bundle plus extracted metadata -#[derive(Debug, Clone)] -pub struct BundleWithMetadata { - pub bundle: EthSendBundle, - pub txn_hashes: Vec, - pub senders: Vec
, - pub min_base_fee: i64, - pub state: BundleState, - pub state_changed_at: DateTime, -} - /// Statistics about bundles and transactions grouped by state #[derive(Debug, Clone, PartialEq, Eq)] pub struct BundleStats { @@ -215,57 +195,38 @@ impl PostgresDatastore { state_changed_at: row.state_changed_at, }) } - - fn extract_bundle_metadata( - &self, - bundle: &EthSendBundle, - ) -> Result<(Vec, i64, Vec)> { - let mut senders = Vec::new(); - let mut txn_hashes = Vec::new(); - - let mut min_base_fee = i64::MAX; - - for tx_bytes in &bundle.txs { - let envelope = OpTxEnvelope::decode_2718_exact(tx_bytes)?; - txn_hashes.push(envelope.hash().encode_hex_with_prefix()); - - let sender = match envelope.recover_signer() { - Ok(signer) => signer, - Err(err) => return Err(err.into()), - }; - - senders.push(sender.encode_hex_with_prefix()); - min_base_fee = min_base_fee.min(envelope.max_fee_per_gas() as i64); // todo type and todo not right - } - - let minimum_base_fee = if min_base_fee == i64::MAX { - 0 - } else { - min_base_fee - }; - - Ok((senders, minimum_base_fee, txn_hashes)) - } } #[async_trait::async_trait] impl BundleDatastore for PostgresDatastore { - async fn insert_bundle(&self, bundle: EthSendBundle) -> Result { + async fn insert_bundle(&self, bundle: BundleWithMetadata) -> Result { let id = Uuid::new_v4(); - let (senders, minimum_base_fee, txn_hashes) = self.extract_bundle_metadata(&bundle)?; + let senders: Vec = bundle + .senders + .iter() + .map(|s| s.encode_hex_with_prefix()) + .collect(); + let txn_hashes: Vec = bundle + .txn_hashes + .iter() + .map(|h| h.encode_hex_with_prefix()) + .collect(); let txs: Vec = bundle + .bundle .txs .iter() .map(|tx| tx.encode_hex_upper_with_prefix()) .collect(); let reverting_tx_hashes: Vec = bundle + .bundle .reverting_tx_hashes .iter() .map(|h| h.encode_hex_with_prefix()) .collect(); let dropping_tx_hashes: Vec = bundle + .bundle .dropping_tx_hashes .iter() .map(|h| h.encode_hex_with_prefix()) @@ -284,14 +245,14 @@ impl BundleDatastore for PostgresDatastore { id, BundleState::Ready as BundleState, &senders, - minimum_base_fee, + bundle.min_base_fee, &txn_hashes, &txs, &reverting_tx_hashes, &dropping_tx_hashes, - bundle.block_number as i64, - bundle.min_timestamp.map(|t| t as i64), - bundle.max_timestamp.map(|t| t as i64), + bundle.bundle.block_number as i64, + bundle.bundle.min_timestamp.map(|t| t as i64), + bundle.bundle.max_timestamp.map(|t| t as i64), ) .execute(&self.pool) .await?; diff --git a/crates/datastore/src/traits.rs b/crates/datastore/src/traits.rs index f58b2d98..65a9390e 100644 --- a/crates/datastore/src/traits.rs +++ b/crates/datastore/src/traits.rs @@ -1,17 +1,16 @@ -use crate::postgres::{ - BlockInfo, BlockInfoUpdate, BundleFilter, BundleState, BundleStats, BundleWithMetadata, -}; +use crate::postgres::{BlockInfo, BlockInfoUpdate, BundleFilter, BundleStats}; use alloy_primitives::TxHash; -use alloy_rpc_types_mev::EthSendBundle; use anyhow::Result; use sqlx::types::chrono::{DateTime, Utc}; use uuid::Uuid; +use tips_common::{BundleState, BundleWithMetadata}; + /// Trait defining the interface for bundle datastore operations #[async_trait::async_trait] pub trait BundleDatastore: Send + Sync { /// Insert a new bundle into the datastore - async fn insert_bundle(&self, bundle: EthSendBundle) -> Result; + async fn insert_bundle(&self, bundle: BundleWithMetadata) -> Result; /// Fetch a bundle with metadata by its ID async fn get_bundle(&self, id: Uuid) -> Result>; diff --git a/crates/datastore/tests/datastore.rs b/crates/datastore/tests/datastore.rs index 8e71a3a5..b46a588e 100644 --- a/crates/datastore/tests/datastore.rs +++ b/crates/datastore/tests/datastore.rs @@ -6,7 +6,8 @@ use testcontainers_modules::{ postgres, testcontainers::{ContainerAsync, runners::AsyncRunner}, }; -use tips_datastore::postgres::{BlockInfoUpdate, BundleFilter, BundleState}; +use tips_common::{BundleState, BundleWithMetadata}; +use tips_datastore::postgres::{BlockInfoUpdate, BundleFilter}; use tips_datastore::{BundleDatastore, PostgresDatastore}; struct TestHarness { @@ -14,7 +15,7 @@ struct TestHarness { data_store: PostgresDatastore, } -async fn setup_datastore() -> eyre::Result { +async fn setup_datastore() -> anyhow::Result { let postgres_instance = postgres::Postgres::default().start().await?; let connection_string = format!( "postgres://postgres:postgres@{}:{}/postgres", @@ -38,8 +39,8 @@ const TX_DATA: Bytes = bytes!( const TX_HASH: TxHash = b256!("0x3ea7e1482485387e61150ee8e5c8cad48a14591789ac02cc2504046d96d0a5f4"); const TX_SENDER: Address = address!("0x24ae36512421f1d9f6e074f00ff5b8393f5dd925"); -fn create_test_bundle_with_reverting_tx() -> eyre::Result { - Ok(EthSendBundle { +fn create_test_bundle_with_reverting_tx() -> anyhow::Result { + let bundle = EthSendBundle { txs: vec![TX_DATA], block_number: 12345, min_timestamp: Some(1640995200), @@ -51,15 +52,18 @@ fn create_test_bundle_with_reverting_tx() -> eyre::Result { refund_recipient: None, refund_tx_hashes: vec![], extra_fields: Default::default(), - }) + }; + + let bundle_with_metadata = BundleWithMetadata::new(&bundle)?; + Ok(bundle_with_metadata) } fn create_test_bundle( block_number: u64, min_timestamp: Option, max_timestamp: Option, -) -> eyre::Result { - Ok(EthSendBundle { +) -> anyhow::Result { + let bundle = EthSendBundle { txs: vec![TX_DATA], block_number, min_timestamp, @@ -71,15 +75,22 @@ fn create_test_bundle( refund_recipient: None, refund_tx_hashes: vec![], extra_fields: Default::default(), - }) + }; + + let bundle_with_metadata = BundleWithMetadata::new(&bundle)?; + Ok(bundle_with_metadata) } #[tokio::test] -async fn insert_and_get() -> eyre::Result<()> { +async fn insert_and_get() -> anyhow::Result<()> { let harness = setup_datastore().await?; - let test_bundle = create_test_bundle_with_reverting_tx()?; + let test_bundle_with_metadata = create_test_bundle_with_reverting_tx()?; + let test_bundle = test_bundle_with_metadata.bundle.clone(); - let insert_result = harness.data_store.insert_bundle(test_bundle.clone()).await; + let insert_result = harness + .data_store + .insert_bundle(test_bundle_with_metadata) + .await; if let Err(ref err) = insert_result { eprintln!("Insert failed with error: {err:?}"); } @@ -140,7 +151,7 @@ async fn insert_and_get() -> eyre::Result<()> { } #[tokio::test] -async fn select_bundles_comprehensive() -> eyre::Result<()> { +async fn select_bundles_comprehensive() -> anyhow::Result<()> { let harness = setup_datastore().await?; let bundle1 = create_test_bundle(100, Some(1000), Some(2000))?; @@ -237,7 +248,7 @@ async fn select_bundles_comprehensive() -> eyre::Result<()> { } #[tokio::test] -async fn cancel_bundle_workflow() -> eyre::Result<()> { +async fn cancel_bundle_workflow() -> anyhow::Result<()> { let harness = setup_datastore().await?; let bundle1 = create_test_bundle(100, Some(1000), Some(2000))?; @@ -304,7 +315,7 @@ async fn cancel_bundle_workflow() -> eyre::Result<()> { } #[tokio::test] -async fn find_bundle_by_transaction_hash() -> eyre::Result<()> { +async fn find_bundle_by_transaction_hash() -> anyhow::Result<()> { let harness = setup_datastore().await?; let test_bundle = create_test_bundle_with_reverting_tx()?; @@ -334,7 +345,7 @@ async fn find_bundle_by_transaction_hash() -> eyre::Result<()> { } #[tokio::test] -async fn remove_bundles() -> eyre::Result<()> { +async fn remove_bundles() -> anyhow::Result<()> { let harness = setup_datastore().await?; let bundle1 = create_test_bundle(100, None, None)?; @@ -360,7 +371,7 @@ async fn remove_bundles() -> eyre::Result<()> { } #[tokio::test] -async fn update_bundles_state() -> eyre::Result<()> { +async fn update_bundles_state() -> anyhow::Result<()> { let harness = setup_datastore().await?; let bundle1 = create_test_bundle(100, None, None)?; @@ -389,7 +400,7 @@ async fn update_bundles_state() -> eyre::Result<()> { } #[tokio::test] -async fn block_info_operations() -> eyre::Result<()> { +async fn block_info_operations() -> anyhow::Result<()> { let harness = setup_datastore().await?; let initial_info = harness.data_store.get_current_block_info().await.unwrap(); @@ -443,7 +454,7 @@ async fn block_info_operations() -> eyre::Result<()> { } #[tokio::test] -async fn get_stats() -> eyre::Result<()> { +async fn get_stats() -> anyhow::Result<()> { let harness = setup_datastore().await?; let stats = harness.data_store.get_stats().await.unwrap(); @@ -475,7 +486,7 @@ async fn get_stats() -> eyre::Result<()> { } #[tokio::test] -async fn remove_timed_out_bundles() -> eyre::Result<()> { +async fn remove_timed_out_bundles() -> anyhow::Result<()> { let harness = setup_datastore().await?; let expired_bundle = create_test_bundle(100, None, Some(1000))?; @@ -516,7 +527,7 @@ async fn remove_timed_out_bundles() -> eyre::Result<()> { } #[tokio::test] -async fn remove_old_included_bundles() -> eyre::Result<()> { +async fn remove_old_included_bundles() -> anyhow::Result<()> { let harness = setup_datastore().await?; let bundle1 = create_test_bundle(100, None, None)?; diff --git a/crates/ingress-rpc/Cargo.toml b/crates/ingress-rpc/Cargo.toml index 13a581b3..e4ce0fa2 100644 --- a/crates/ingress-rpc/Cargo.toml +++ b/crates/ingress-rpc/Cargo.toml @@ -35,3 +35,4 @@ op-revm.workspace = true revm-context-interface.workspace = true alloy-signer-local.workspace = true reth-optimism-evm.workspace = true +tips-common.workspace = true diff --git a/crates/ingress-rpc/src/queue.rs b/crates/ingress-rpc/src/queue.rs index 94654bb1..a02a3e21 100644 --- a/crates/ingress-rpc/src/queue.rs +++ b/crates/ingress-rpc/src/queue.rs @@ -1,16 +1,16 @@ use alloy_primitives::B256; -use alloy_rpc_types_mev::EthSendBundle; use anyhow::Result; use async_trait::async_trait; use backon::{ExponentialBuilder, Retryable}; use rdkafka::producer::{FutureProducer, FutureRecord}; +use tips_common::BundleWithMetadata; use tokio::time::Duration; use tracing::{error, info}; /// A queue to buffer transactions #[async_trait] pub trait QueuePublisher: Send + Sync { - async fn publish(&self, bundle: &EthSendBundle, bundle_hash: &B256) -> Result<()>; + async fn publish(&self, bundle: &BundleWithMetadata, bundle_hash: &B256) -> Result<()>; } /// A queue to buffer transactions @@ -27,9 +27,9 @@ impl KafkaQueuePublisher { #[async_trait] impl QueuePublisher for KafkaQueuePublisher { - async fn publish(&self, bundle: &EthSendBundle, bundle_hash: &B256) -> Result<()> { + async fn publish(&self, bundle: &BundleWithMetadata, bundle_hash: &B256) -> Result<()> { let key = bundle_hash.to_string(); - let payload = serde_json::to_vec(bundle)?; + let payload = serde_json::to_vec(&bundle)?; let enqueue = || async { let record = FutureRecord::to(&self.topic).key(&key).payload(&payload); @@ -74,11 +74,12 @@ impl QueuePublisher for KafkaQueuePublisher { #[cfg(test)] mod tests { use super::*; + use alloy_rpc_types_mev::EthSendBundle; use rdkafka::config::ClientConfig; use tokio::time::{Duration, Instant}; - fn create_test_bundle() -> EthSendBundle { - EthSendBundle::default() + fn create_test_bundle() -> BundleWithMetadata { + BundleWithMetadata::new(&EthSendBundle::default()).unwrap() } #[tokio::test] @@ -92,7 +93,7 @@ mod tests { let publisher = KafkaQueuePublisher::new(producer, "tips-ingress-rpc".to_string()); let bundle = create_test_bundle(); - let bundle_hash = bundle.bundle_hash(); + let bundle_hash = bundle.bundle.bundle_hash(); let start = Instant::now(); let result = publisher.publish(&bundle, &bundle_hash).await; diff --git a/crates/ingress-rpc/src/service.rs b/crates/ingress-rpc/src/service.rs index a7991bd8..789bf9d7 100644 --- a/crates/ingress-rpc/src/service.rs +++ b/crates/ingress-rpc/src/service.rs @@ -11,6 +11,7 @@ use op_alloy_consensus::OpTxEnvelope; use op_alloy_network::Optimism; use reth_rpc_eth_types::EthApiError; use std::time::{SystemTime, UNIX_EPOCH}; +use tips_common::BundleWithMetadata; use tracing::{info, warn}; use crate::queue::QueuePublisher; @@ -60,11 +61,15 @@ where Queue: QueuePublisher + Sync + Send + 'static, { async fn send_bundle(&self, bundle: EthSendBundle) -> RpcResult { - self.validate_bundle(&bundle).await?; + let bundle_with_metadata = self.validate_bundle(&bundle).await?; // Queue the bundle let bundle_hash = bundle.bundle_hash(); - if let Err(e) = self.queue.publish(&bundle, &bundle_hash).await { + if let Err(e) = self + .queue + .publish(&bundle_with_metadata, &bundle_hash) + .await + { warn!(message = "Failed to publish bundle to queue", bundle_hash = %bundle_hash, error = %e); return Err(EthApiError::InvalidParams("Failed to queue bundle".into()).into_rpc_err()); } @@ -107,8 +112,14 @@ where }; // queue the bundle + let bundle_with_metadata = BundleWithMetadata::new(&bundle) + .map_err(|e| EthApiError::InvalidParams(e.to_string()).into_rpc_err())?; let bundle_hash = bundle.bundle_hash(); - if let Err(e) = self.queue.publish(&bundle, &bundle_hash).await { + if let Err(e) = self + .queue + .publish(&bundle_with_metadata, &bundle_hash) + .await + { warn!(message = "Failed to publish Queue::enqueue_bundle", bundle_hash = %bundle_hash, error = %e); } @@ -164,7 +175,7 @@ where Ok(transaction) } - async fn validate_bundle(&self, bundle: &EthSendBundle) -> RpcResult<()> { + async fn validate_bundle(&self, bundle: &EthSendBundle) -> RpcResult { if bundle.txs.is_empty() { return Err( EthApiError::InvalidParams("Bundle cannot have empty transactions".into()) @@ -177,7 +188,10 @@ where let transaction = self.validate_tx(tx_data).await?; total_gas = total_gas.saturating_add(transaction.gas_limit()); } + validate_bundle(bundle, total_gas)?; - validate_bundle(bundle, total_gas) + let bundle_with_metadata = BundleWithMetadata::new(bundle) + .map_err(|e| EthApiError::InvalidParams(e.to_string()).into_rpc_err())?; + Ok(bundle_with_metadata) } } diff --git a/crates/ingress-writer/Cargo.toml b/crates/ingress-writer/Cargo.toml index 2eda385b..e6528b7e 100644 --- a/crates/ingress-writer/Cargo.toml +++ b/crates/ingress-writer/Cargo.toml @@ -14,7 +14,6 @@ path = "src/main.rs" [dependencies] tips-datastore.workspace = true tips-audit.workspace=true -alloy-rpc-types-mev.workspace = true tokio.workspace = true tracing.workspace = true tracing-subscriber.workspace = true @@ -25,3 +24,4 @@ rdkafka.workspace = true serde_json.workspace = true backon.workspace = true uuid.workspace = true +tips-common.workspace = true diff --git a/crates/ingress-writer/src/main.rs b/crates/ingress-writer/src/main.rs index 116aac59..a64542b7 100644 --- a/crates/ingress-writer/src/main.rs +++ b/crates/ingress-writer/src/main.rs @@ -1,4 +1,3 @@ -use alloy_rpc_types_mev::EthSendBundle; use anyhow::Result; use backon::{ExponentialBuilder, Retryable}; use clap::Parser; @@ -10,6 +9,7 @@ use rdkafka::{ }; use std::fs; use tips_audit::{BundleEvent, BundleEventPublisher, KafkaBundleEventPublisher}; +use tips_common::BundleWithMetadata; use tips_datastore::{BundleDatastore, postgres::PostgresDatastore}; use tokio::time::Duration; use tracing::{debug, error, info, warn}; @@ -64,13 +64,13 @@ where }) } - async fn insert_bundle(&self) -> Result<(Uuid, EthSendBundle)> { + async fn insert_bundle(&self) -> Result<(Uuid, BundleWithMetadata)> { match self.queue_consumer.recv().await { Ok(message) => { let payload = message .payload() .ok_or_else(|| anyhow::anyhow!("Message has no payload"))?; - let bundle: EthSendBundle = serde_json::from_slice(payload)?; + let bundle: BundleWithMetadata = serde_json::from_slice(payload)?; debug!( bundle = ?bundle, offset = message.offset(), @@ -107,12 +107,12 @@ where } } - async fn publish(&self, bundle_id: Uuid, bundle: &EthSendBundle) { + async fn publish(&self, bundle_id: Uuid, bundle: &BundleWithMetadata) { if let Err(e) = self .publisher .publish(BundleEvent::Created { bundle_id, - bundle: bundle.clone(), + bundle: bundle.bundle.clone(), }) .await { diff --git a/crates/maintenance/Cargo.toml b/crates/maintenance/Cargo.toml index d94fd552..3037addb 100644 --- a/crates/maintenance/Cargo.toml +++ b/crates/maintenance/Cargo.toml @@ -32,3 +32,4 @@ op-alloy-rpc-types.workspace = true base-reth-flashblocks-rpc.workspace = true alloy-rpc-types-mev.workspace = true sqlx.workspace = true +tips-common.workspace = true diff --git a/crates/maintenance/src/job.rs b/crates/maintenance/src/job.rs index cbc09f19..1c14b992 100644 --- a/crates/maintenance/src/job.rs +++ b/crates/maintenance/src/job.rs @@ -15,8 +15,9 @@ use sqlx::types::chrono::Utc; use std::collections::HashSet; use std::time::Duration; use tips_audit::{BundleEvent, BundleEventPublisher, DropReason}; +use tips_common::{BundleState, BundleWithMetadata}; use tips_datastore::BundleDatastore; -use tips_datastore::postgres::{BlockInfoUpdate, BundleFilter, BundleState, BundleWithMetadata}; +use tips_datastore::postgres::{BlockInfoUpdate, BundleFilter}; use tokio::sync::mpsc; use tracing::{debug, error, info, warn}; use uuid::Uuid; @@ -399,7 +400,7 @@ mod tests { use alloy_primitives::{TxHash, b256}; use alloy_rpc_types_mev::EthSendBundle; use sqlx::types::chrono::Utc; - use tips_datastore::postgres::BundleState; + use tips_common::BundleState; const TX_1: TxHash = b256!("1111111111111111111111111111111111111111111111111111111111111111"); const TX_2: TxHash = b256!("2222222222222222222222222222222222222222222222222222222222222222");