From cd3d9cf9783e95a911ddc1637742602af0185d3a Mon Sep 17 00:00:00 2001 From: William Law Date: Thu, 23 Oct 2025 11:21:47 -0400 Subject: [PATCH 1/9] spike --- crates/ingress-rpc/src/service.rs | 87 ++++++++++++++++++++++++++++--- 1 file changed, 79 insertions(+), 8 deletions(-) diff --git a/crates/ingress-rpc/src/service.rs b/crates/ingress-rpc/src/service.rs index a48841d4..d0caaeaf 100644 --- a/crates/ingress-rpc/src/service.rs +++ b/crates/ingress-rpc/src/service.rs @@ -1,5 +1,4 @@ -use crate::validation::{AccountInfoLookup, L1BlockInfoLookup, validate_tx}; -use alloy_consensus::transaction::SignerRecoverable; +use alloy_consensus::{Transaction, transaction::SignerRecoverable}; use alloy_primitives::{B256, Bytes}; use alloy_provider::{Provider, RootProvider, network::eip2718::Decodable2718}; use alloy_rpc_types_mev::{EthBundleHash, EthCancelBundle, EthSendBundle}; @@ -10,10 +9,14 @@ use jsonrpsee::{ use op_alloy_consensus::OpTxEnvelope; use op_alloy_network::Optimism; use reth_rpc_eth_types::EthApiError; -use std::time::{SystemTime, UNIX_EPOCH}; +use std::time::{Duration, SystemTime, UNIX_EPOCH}; use tracing::{info, warn}; use crate::queue::QueuePublisher; +use crate::validation::{AccountInfoLookup, L1BlockInfoLookup, validate_tx}; + +// TODO: make this configurable +const MAX_BUNDLE_GAS: u64 = 30_000_000; #[rpc(server, namespace = "eth")] pub trait IngressApi { @@ -58,12 +61,80 @@ impl IngressApiServer for IngressService where Queue: QueuePublisher + Sync + Send + 'static, { - async fn send_bundle(&self, _bundle: EthSendBundle) -> RpcResult { - warn!( - message = "TODO: implement send_bundle", - method = "send_bundle" + async fn send_bundle(&self, bundle: EthSendBundle) -> RpcResult { + if bundle.txs.is_empty() { + return Err( + EthApiError::InvalidParams("Bundle cannot have empty transactions".into()) + .into_rpc_err(), + ); + } + + // Don't allow bundles to be submitted over 1 hour into the future + // TODO: make the window configurable + let valid_timestamp_window = SystemTime::now() + .duration_since(UNIX_EPOCH) + .unwrap() + .as_secs() + + Duration::from_secs(3600).as_secs(); + if let Some(max_timestamp) = bundle.max_timestamp { + if max_timestamp > valid_timestamp_window { + return Err(EthApiError::InvalidParams( + "Bundle cannot be more than 1 hour in the future".into(), + ) + .into_rpc_err()); + } + } + + // Decode and validate all transactions + let mut decoded_txs = Vec::new(); + let mut total_gas = 0u64; + for tx_data in &bundle.txs { + if tx_data.is_empty() { + return Err(EthApiError::EmptyRawTransactionData.into_rpc_err()); + } + + let envelope = OpTxEnvelope::decode_2718_exact(tx_data.iter().as_slice()) + .map_err(|_| EthApiError::FailedToDecodeSignedTransaction.into_rpc_err())?; + + let transaction = envelope + .clone() + .try_into_recovered() + .map_err(|_| EthApiError::FailedToDecodeSignedTransaction.into_rpc_err())?; + + // Add gas limit to total + total_gas = total_gas.saturating_add(transaction.gas_limit()); + + decoded_txs.push(transaction); + } + + // Check max gas limit for the entire bundle + if total_gas > MAX_BUNDLE_GAS { + return Err(EthApiError::InvalidParams(format!( + "Bundle gas limit {total_gas} exceeds maximum allowed {MAX_BUNDLE_GAS}" + )) + .into_rpc_err()); + } + + // For now, we'll use the first transaction's signer as the bundle sender + // In a real implementation, you might want different logic here + let sender = decoded_txs[0].signer(); + + // Queue the bundle + if let Err(e) = self.queue.publish(&bundle, sender).await { + warn!(message = "Failed to publish bundle to queue", sender = %sender, error = %e); + return Err(EthApiError::InvalidParams("Failed to queue bundle".into()).into_rpc_err()); + } + + info!( + message = "queued bundle", + tx_count = bundle.txs.len(), + total_gas = total_gas, + sender = %sender ); - todo!("implement send_bundle") + + Ok(EthBundleHash { + bundle_hash: bundle.bundle_hash(), + }) } async fn cancel_bundle(&self, _request: EthCancelBundle) -> RpcResult<()> { From 43311b34c0895c4983909daebf9ef10a65fb3304 Mon Sep 17 00:00:00 2001 From: William Law Date: Thu, 23 Oct 2025 13:07:28 -0400 Subject: [PATCH 2/9] use bundle hash as key --- crates/ingress-rpc/src/queue.rs | 20 ++++++++++---------- crates/ingress-rpc/src/service.rs | 20 +++++++------------- 2 files changed, 17 insertions(+), 23 deletions(-) diff --git a/crates/ingress-rpc/src/queue.rs b/crates/ingress-rpc/src/queue.rs index 4a685de2..ddd3baa7 100644 --- a/crates/ingress-rpc/src/queue.rs +++ b/crates/ingress-rpc/src/queue.rs @@ -1,4 +1,4 @@ -use alloy_primitives::Address; +use alloy_primitives::B256; use alloy_rpc_types_mev::EthSendBundle; use anyhow::{Error, Result}; use async_trait::async_trait; @@ -10,7 +10,7 @@ use tracing::{error, info}; /// A queue to buffer transactions #[async_trait] pub trait QueuePublisher: Send + Sync { - async fn publish(&self, bundle: &EthSendBundle, sender: Address) -> Result<()>; + async fn publish(&self, bundle: &EthSendBundle, bundle_hash: &B256) -> Result<()>; } /// A queue to buffer transactions @@ -27,9 +27,9 @@ impl KafkaQueuePublisher { pub async fn enqueue_bundle( &self, bundle: &EthSendBundle, - sender: Address, + bundle_hash: &B256, ) -> Result<(), Error> { - let key = sender.to_string(); + let key = bundle_hash.to_string(); let payload = serde_json::to_vec(bundle)?; let enqueue = || async { @@ -38,7 +38,7 @@ impl KafkaQueuePublisher { match self.producer.send(record, Duration::from_secs(5)).await { Ok((partition, offset)) => { info!( - sender = %sender, + bundle_hash = %bundle_hash, partition = partition, offset = offset, topic = %self.topic, @@ -48,7 +48,7 @@ impl KafkaQueuePublisher { } Err((err, _)) => { error!( - sender = %sender, + bundle_hash = %bundle_hash, error = %err, topic = %self.topic, "Failed to enqueue bundle" @@ -74,8 +74,8 @@ impl KafkaQueuePublisher { #[async_trait] impl QueuePublisher for KafkaQueuePublisher { - async fn publish(&self, bundle: &EthSendBundle, sender: Address) -> Result<()> { - self.enqueue_bundle(bundle, sender).await + async fn publish(&self, bundle: &EthSendBundle, bundle_hash: &B256) -> Result<()> { + self.enqueue_bundle(bundle, bundle_hash).await } } @@ -100,10 +100,10 @@ mod tests { let publisher = KafkaQueuePublisher::new(producer, "tips-ingress-rpc".to_string()); let bundle = create_test_bundle(); - let sender = Address::ZERO; + let bundle_hash = bundle.bundle_hash(); let start = Instant::now(); - let result = publisher.enqueue_bundle(&bundle, sender).await; + let result = publisher.enqueue_bundle(&bundle, &bundle_hash).await; let elapsed = start.elapsed(); // the backoff tries at minimum 100ms, so verify we tried at least once diff --git a/crates/ingress-rpc/src/service.rs b/crates/ingress-rpc/src/service.rs index d0caaeaf..2b096adf 100644 --- a/crates/ingress-rpc/src/service.rs +++ b/crates/ingress-rpc/src/service.rs @@ -86,7 +86,6 @@ where } // Decode and validate all transactions - let mut decoded_txs = Vec::new(); let mut total_gas = 0u64; for tx_data in &bundle.txs { if tx_data.is_empty() { @@ -103,8 +102,6 @@ where // Add gas limit to total total_gas = total_gas.saturating_add(transaction.gas_limit()); - - decoded_txs.push(transaction); } // Check max gas limit for the entire bundle @@ -115,21 +112,18 @@ where .into_rpc_err()); } - // For now, we'll use the first transaction's signer as the bundle sender - // In a real implementation, you might want different logic here - let sender = decoded_txs[0].signer(); - // Queue the bundle - if let Err(e) = self.queue.publish(&bundle, sender).await { - warn!(message = "Failed to publish bundle to queue", sender = %sender, error = %e); + let bundle_hash = bundle.bundle_hash(); + if let Err(e) = self.queue.publish(&bundle, &bundle_hash).await { + warn!(message = "Failed to publish bundle to queue", bundle_hash = %bundle_hash, error = %e); return Err(EthApiError::InvalidParams("Failed to queue bundle".into()).into_rpc_err()); } info!( message = "queued bundle", + bundle_hash = %bundle_hash, tx_count = bundle.txs.len(), total_gas = total_gas, - sender = %sender ); Ok(EthBundleHash { @@ -181,9 +175,9 @@ where }; // queue the bundle - let sender = transaction.signer(); - if let Err(e) = self.queue.publish(&bundle, sender).await { - warn!(message = "Failed to publish Queue::enqueue_bundle", sender = %sender, error = %e); + let bundle_hash = bundle.bundle_hash(); + if let Err(e) = self.queue.publish(&bundle, &bundle_hash).await { + warn!(message = "Failed to publish Queue::enqueue_bundle", bundle_hash = %bundle_hash, error = %e); } info!(message="queued singleton bundle", txn_hash=%transaction.tx_hash()); From 75ef8510c0d0d37627d47d8b0e7dd0badcffc274 Mon Sep 17 00:00:00 2001 From: William Law Date: Thu, 23 Oct 2025 13:23:50 -0400 Subject: [PATCH 3/9] validate txs in bundle --- crates/ingress-rpc/src/service.rs | 7 ++++++- 1 file changed, 6 insertions(+), 1 deletion(-) diff --git a/crates/ingress-rpc/src/service.rs b/crates/ingress-rpc/src/service.rs index 2b096adf..4366be3a 100644 --- a/crates/ingress-rpc/src/service.rs +++ b/crates/ingress-rpc/src/service.rs @@ -100,7 +100,12 @@ where .try_into_recovered() .map_err(|_| EthApiError::FailedToDecodeSignedTransaction.into_rpc_err())?; - // Add gas limit to total + let mut l1_block_info = self.provider.fetch_l1_block_info().await?; + let account = self + .provider + .fetch_account_info(transaction.signer()) + .await?; + validate_tx(account, &transaction, tx_data, &mut l1_block_info).await?; total_gas = total_gas.saturating_add(transaction.gas_limit()); } From 1907020e22736fd79cc039941a17acda841beb3c Mon Sep 17 00:00:00 2001 From: William Law Date: Thu, 23 Oct 2025 13:39:07 -0400 Subject: [PATCH 4/9] refactor validate_tx --- crates/ingress-rpc/src/service.rs | 67 ++++++++++++++----------------- 1 file changed, 31 insertions(+), 36 deletions(-) diff --git a/crates/ingress-rpc/src/service.rs b/crates/ingress-rpc/src/service.rs index 4366be3a..9d814bd5 100644 --- a/crates/ingress-rpc/src/service.rs +++ b/crates/ingress-rpc/src/service.rs @@ -1,3 +1,4 @@ +use alloy_consensus::transaction::Recovered; use alloy_consensus::{Transaction, transaction::SignerRecoverable}; use alloy_primitives::{B256, Bytes}; use alloy_provider::{Provider, RootProvider, network::eip2718::Decodable2718}; @@ -88,24 +89,7 @@ where // Decode and validate all transactions let mut total_gas = 0u64; for tx_data in &bundle.txs { - if tx_data.is_empty() { - return Err(EthApiError::EmptyRawTransactionData.into_rpc_err()); - } - - let envelope = OpTxEnvelope::decode_2718_exact(tx_data.iter().as_slice()) - .map_err(|_| EthApiError::FailedToDecodeSignedTransaction.into_rpc_err())?; - - let transaction = envelope - .clone() - .try_into_recovered() - .map_err(|_| EthApiError::FailedToDecodeSignedTransaction.into_rpc_err())?; - - let mut l1_block_info = self.provider.fetch_l1_block_info().await?; - let account = self - .provider - .fetch_account_info(transaction.signer()) - .await?; - validate_tx(account, &transaction, tx_data, &mut l1_block_info).await?; + let transaction = self.validate_tx(tx_data).await?; total_gas = total_gas.saturating_add(transaction.gas_limit()); } @@ -145,24 +129,7 @@ where } async fn send_raw_transaction(&self, data: Bytes) -> RpcResult { - if data.is_empty() { - return Err(EthApiError::EmptyRawTransactionData.into_rpc_err()); - } - - let envelope = OpTxEnvelope::decode_2718_exact(data.iter().as_slice()) - .map_err(|_| EthApiError::FailedToDecodeSignedTransaction.into_rpc_err())?; - - let transaction = envelope - .clone() - .try_into_recovered() - .map_err(|_| EthApiError::FailedToDecodeSignedTransaction.into_rpc_err())?; - - let mut l1_block_info = self.provider.fetch_l1_block_info().await?; - let account = self - .provider - .fetch_account_info(transaction.signer()) - .await?; - validate_tx(account, &transaction, &data, &mut l1_block_info).await?; + let transaction = self.validate_tx(&data).await?; let expiry_timestamp = SystemTime::now() .duration_since(UNIX_EPOCH) @@ -209,3 +176,31 @@ where Ok(transaction.tx_hash()) } } + +impl IngressService +where + Queue: QueuePublisher + Sync + Send + 'static, +{ + async fn validate_tx(&self, data: &Bytes) -> RpcResult> { + if data.is_empty() { + return Err(EthApiError::EmptyRawTransactionData.into_rpc_err()); + } + + let envelope = OpTxEnvelope::decode_2718_exact(data.iter().as_slice()) + .map_err(|_| EthApiError::FailedToDecodeSignedTransaction.into_rpc_err())?; + + let transaction = envelope + .clone() + .try_into_recovered() + .map_err(|_| EthApiError::FailedToDecodeSignedTransaction.into_rpc_err())?; + + let mut l1_block_info = self.provider.fetch_l1_block_info().await?; + let account = self + .provider + .fetch_account_info(transaction.signer()) + .await?; + validate_tx(account, &transaction, data, &mut l1_block_info).await?; + + Ok(transaction) + } +} From eb8d2a12e989bd85b61f3184f05590f64369bd18 Mon Sep 17 00:00:00 2001 From: William Law Date: Thu, 23 Oct 2025 14:06:54 -0400 Subject: [PATCH 5/9] refactor validate_bundle --- crates/ingress-rpc/src/service.rs | 63 +++++++++------------------- crates/ingress-rpc/src/validation.rs | 36 ++++++++++++++++ 2 files changed, 56 insertions(+), 43 deletions(-) diff --git a/crates/ingress-rpc/src/service.rs b/crates/ingress-rpc/src/service.rs index 9d814bd5..1474c057 100644 --- a/crates/ingress-rpc/src/service.rs +++ b/crates/ingress-rpc/src/service.rs @@ -10,14 +10,11 @@ use jsonrpsee::{ use op_alloy_consensus::OpTxEnvelope; use op_alloy_network::Optimism; use reth_rpc_eth_types::EthApiError; -use std::time::{Duration, SystemTime, UNIX_EPOCH}; +use std::time::{SystemTime, UNIX_EPOCH}; use tracing::{info, warn}; use crate::queue::QueuePublisher; -use crate::validation::{AccountInfoLookup, L1BlockInfoLookup, validate_tx}; - -// TODO: make this configurable -const MAX_BUNDLE_GAS: u64 = 30_000_000; +use crate::validation::{AccountInfoLookup, L1BlockInfoLookup, validate_bundle, validate_tx}; #[rpc(server, namespace = "eth")] pub trait IngressApi { @@ -63,43 +60,7 @@ where Queue: QueuePublisher + Sync + Send + 'static, { async fn send_bundle(&self, bundle: EthSendBundle) -> RpcResult { - if bundle.txs.is_empty() { - return Err( - EthApiError::InvalidParams("Bundle cannot have empty transactions".into()) - .into_rpc_err(), - ); - } - - // Don't allow bundles to be submitted over 1 hour into the future - // TODO: make the window configurable - let valid_timestamp_window = SystemTime::now() - .duration_since(UNIX_EPOCH) - .unwrap() - .as_secs() - + Duration::from_secs(3600).as_secs(); - if let Some(max_timestamp) = bundle.max_timestamp { - if max_timestamp > valid_timestamp_window { - return Err(EthApiError::InvalidParams( - "Bundle cannot be more than 1 hour in the future".into(), - ) - .into_rpc_err()); - } - } - - // Decode and validate all transactions - let mut total_gas = 0u64; - for tx_data in &bundle.txs { - let transaction = self.validate_tx(tx_data).await?; - total_gas = total_gas.saturating_add(transaction.gas_limit()); - } - - // Check max gas limit for the entire bundle - if total_gas > MAX_BUNDLE_GAS { - return Err(EthApiError::InvalidParams(format!( - "Bundle gas limit {total_gas} exceeds maximum allowed {MAX_BUNDLE_GAS}" - )) - .into_rpc_err()); - } + self.validate_bundle(&bundle).await?; // Queue the bundle let bundle_hash = bundle.bundle_hash(); @@ -112,7 +73,6 @@ where message = "queued bundle", bundle_hash = %bundle_hash, tx_count = bundle.txs.len(), - total_gas = total_gas, ); Ok(EthBundleHash { @@ -203,4 +163,21 @@ where Ok(transaction) } + + async fn validate_bundle(&self, bundle: &EthSendBundle) -> RpcResult<()> { + if bundle.txs.is_empty() { + return Err( + EthApiError::InvalidParams("Bundle cannot have empty transactions".into()) + .into_rpc_err(), + ); + } + + let mut total_gas = 0u64; + for tx_data in &bundle.txs { + let transaction = self.validate_tx(tx_data).await?; + total_gas = total_gas.saturating_add(transaction.gas_limit()); + } + + validate_bundle(bundle, total_gas).await + } } diff --git a/crates/ingress-rpc/src/validation.rs b/crates/ingress-rpc/src/validation.rs index cacc66d8..9b3603af 100644 --- a/crates/ingress-rpc/src/validation.rs +++ b/crates/ingress-rpc/src/validation.rs @@ -2,6 +2,7 @@ use alloy_consensus::private::alloy_eips::{BlockId, BlockNumberOrTag}; use alloy_consensus::{Transaction, Typed2718, constants::KECCAK_EMPTY, transaction::Recovered}; use alloy_primitives::{Address, B256, U256}; use alloy_provider::{Provider, RootProvider}; +use alloy_rpc_types_mev::EthSendBundle; use async_trait::async_trait; use jsonrpsee::core::RpcResult; use op_alloy_consensus::interop::CROSS_L2_INBOX_ADDRESS; @@ -9,8 +10,12 @@ use op_alloy_network::Optimism; use op_revm::{OpSpecId, l1block::L1BlockInfo}; use reth_optimism_evm::extract_l1_info_from_tx; use reth_rpc_eth_types::{EthApiError, RpcInvalidTransactionError, SignError}; +use std::time::{Duration, SystemTime, UNIX_EPOCH}; use tracing::warn; +// TODO: make this configurable +const MAX_BUNDLE_GAS: u64 = 30_000_000; + /// Account info for a given address pub struct AccountInfo { pub balance: U256, @@ -158,6 +163,37 @@ pub async fn validate_tx( Ok(()) } +/// Helper function to validate propeties of a bundle. A bundle is valid if it satisfies the following criteria: +/// - The bundle's max_timestamp is not more than 1 hour in the future +/// - The bundle's gas limit is not greater than the maximum allowed gas limit +pub async fn validate_bundle(bundle: &EthSendBundle, bundle_gas: u64) -> RpcResult<()> { + // Don't allow bundles to be submitted over 1 hour into the future + // TODO: make the window configurable + let valid_timestamp_window = SystemTime::now() + .duration_since(UNIX_EPOCH) + .unwrap() + .as_secs() + + Duration::from_secs(3600).as_secs(); + if let Some(max_timestamp) = bundle.max_timestamp { + if max_timestamp > valid_timestamp_window { + return Err(EthApiError::InvalidParams( + "Bundle cannot be more than 1 hour in the future".into(), + ) + .into_rpc_err()); + } + } + + // Check max gas limit for the entire bundle + if bundle_gas > MAX_BUNDLE_GAS { + return Err(EthApiError::InvalidParams(format!( + "Bundle gas limit {bundle_gas} exceeds maximum allowed {MAX_BUNDLE_GAS}" + )) + .into_rpc_err()); + } + + Ok(()) +} + #[cfg(test)] mod tests { use super::*; From ff03427f5d66f84495df362095caa1b7e85944c1 Mon Sep 17 00:00:00 2001 From: William Law Date: Thu, 23 Oct 2025 14:27:55 -0400 Subject: [PATCH 6/9] add unit tests --- crates/ingress-rpc/src/validation.rs | 82 ++++++++++++++++++++++++++-- 1 file changed, 78 insertions(+), 4 deletions(-) diff --git a/crates/ingress-rpc/src/validation.rs b/crates/ingress-rpc/src/validation.rs index 9b3603af..e5587ea3 100644 --- a/crates/ingress-rpc/src/validation.rs +++ b/crates/ingress-rpc/src/validation.rs @@ -185,10 +185,10 @@ pub async fn validate_bundle(bundle: &EthSendBundle, bundle_gas: u64) -> RpcResu // Check max gas limit for the entire bundle if bundle_gas > MAX_BUNDLE_GAS { - return Err(EthApiError::InvalidParams(format!( - "Bundle gas limit {bundle_gas} exceeds maximum allowed {MAX_BUNDLE_GAS}" - )) - .into_rpc_err()); + return Err( + EthApiError::InvalidParams("Bundle gas limit exceeds maximum allowed".into()) + .into_rpc_err(), + ); } Ok(()) @@ -200,11 +200,14 @@ mod tests { use alloy_consensus::SignableTransaction; use alloy_consensus::{Transaction, constants::KECCAK_EMPTY, transaction::SignerRecoverable}; use alloy_consensus::{TxEip1559, TxEip4844, TxEip7702}; + use alloy_primitives::Bytes; use alloy_primitives::{bytes, keccak256}; use alloy_signer_local::PrivateKeySigner; use op_alloy_consensus::OpTxEnvelope; use op_alloy_network::TxSignerSync; + use op_alloy_network::eip2718::Encodable2718; use revm_context_interface::transaction::{AccessList, AccessListItem}; + use std::time::{SystemTime, UNIX_EPOCH}; fn create_account(nonce: u64, balance: U256) -> AccountInfo { AccountInfo { @@ -488,4 +491,75 @@ mod tests { .into_rpc_err()) ); } + + #[tokio::test] + async fn test_err_bundle_max_timestamp_too_far_in_the_future() { + let current_time = SystemTime::now() + .duration_since(UNIX_EPOCH) + .unwrap() + .as_secs(); + let too_far_in_the_future = current_time + 3601; + let bundle = EthSendBundle { + txs: vec![], + max_timestamp: Some(too_far_in_the_future), + ..Default::default() + }; + assert_eq!( + validate_bundle(&bundle, 0).await, + Err(EthApiError::InvalidParams( + "Bundle cannot be more than 1 hour in the future".into() + ) + .into_rpc_err()) + ); + } + + #[tokio::test] + async fn test_err_bundle_max_gas_limit_too_high() { + let signer = PrivateKeySigner::random(); + let mut encoded_txs = vec![]; + + // Create transactions that collectively exceed MAX_BUNDLE_GAS (30M) + // Each transaction uses 4M gas, so 8 transactions = 32M gas > 30M limit + let gas = 4_000_000; + let mut total_gas = 0u64; + for _ in 0..8 { + let mut tx = TxEip1559 { + chain_id: 1, + nonce: 0, + gas_limit: gas, + max_fee_per_gas: 200000u128, + max_priority_fee_per_gas: 100000u128, + to: Address::random().into(), + value: U256::from(1000000u128), + access_list: Default::default(), + input: bytes!("").clone(), + }; + total_gas = total_gas.saturating_add(gas); + + let signature = signer.sign_transaction_sync(&mut tx).unwrap(); + let envelope = OpTxEnvelope::Eip1559(tx.into_signed(signature)); + + // Encode the transaction + let mut encoded = vec![]; + envelope.encode_2718(&mut encoded); + encoded_txs.push(Bytes::from(encoded)); + } + + let bundle = EthSendBundle { + txs: encoded_txs, + block_number: 0, + min_timestamp: None, + max_timestamp: None, + reverting_tx_hashes: vec![], + ..Default::default() + }; + + // Test should fail due to exceeding gas limit + let result = validate_bundle(&bundle, total_gas).await; + assert!(result.is_err()); + if let Err(e) = result { + let error_message = format!("{e:?}"); + assert!(error_message.contains("Bundle gas limit exceeds maximum allowed")); + } + } } From 70055733e0cd022a9882461ee7e3ee2027e2cf8e Mon Sep 17 00:00:00 2001 From: William Law Date: Thu, 23 Oct 2025 14:29:43 -0400 Subject: [PATCH 7/9] make clippy happy --- crates/ingress-rpc/src/validation.rs | 14 +++++++------- 1 file changed, 7 insertions(+), 7 deletions(-) diff --git a/crates/ingress-rpc/src/validation.rs b/crates/ingress-rpc/src/validation.rs index e5587ea3..07598909 100644 --- a/crates/ingress-rpc/src/validation.rs +++ b/crates/ingress-rpc/src/validation.rs @@ -174,13 +174,13 @@ pub async fn validate_bundle(bundle: &EthSendBundle, bundle_gas: u64) -> RpcResu .unwrap() .as_secs() + Duration::from_secs(3600).as_secs(); - if let Some(max_timestamp) = bundle.max_timestamp { - if max_timestamp > valid_timestamp_window { - return Err(EthApiError::InvalidParams( - "Bundle cannot be more than 1 hour in the future".into(), - ) - .into_rpc_err()); - } + if let Some(max_timestamp) = bundle.max_timestamp + && max_timestamp > valid_timestamp_window + { + return Err(EthApiError::InvalidParams( + "Bundle cannot be more than 1 hour in the future".into(), + ) + .into_rpc_err()); } // Check max gas limit for the entire bundle From 0771f7be32b117786bbfa41b8275425c7fc558a6 Mon Sep 17 00:00:00 2001 From: William Law Date: Thu, 23 Oct 2025 14:39:09 -0400 Subject: [PATCH 8/9] crate::validate_bundle doesnt need async --- crates/ingress-rpc/src/service.rs | 2 +- crates/ingress-rpc/src/validation.rs | 6 +++--- 2 files changed, 4 insertions(+), 4 deletions(-) diff --git a/crates/ingress-rpc/src/service.rs b/crates/ingress-rpc/src/service.rs index 1474c057..a7991bd8 100644 --- a/crates/ingress-rpc/src/service.rs +++ b/crates/ingress-rpc/src/service.rs @@ -178,6 +178,6 @@ where total_gas = total_gas.saturating_add(transaction.gas_limit()); } - validate_bundle(bundle, total_gas).await + validate_bundle(bundle, total_gas) } } diff --git a/crates/ingress-rpc/src/validation.rs b/crates/ingress-rpc/src/validation.rs index 07598909..b9d1d2e4 100644 --- a/crates/ingress-rpc/src/validation.rs +++ b/crates/ingress-rpc/src/validation.rs @@ -166,7 +166,7 @@ pub async fn validate_tx( /// Helper function to validate propeties of a bundle. A bundle is valid if it satisfies the following criteria: /// - The bundle's max_timestamp is not more than 1 hour in the future /// - The bundle's gas limit is not greater than the maximum allowed gas limit -pub async fn validate_bundle(bundle: &EthSendBundle, bundle_gas: u64) -> RpcResult<()> { +pub fn validate_bundle(bundle: &EthSendBundle, bundle_gas: u64) -> RpcResult<()> { // Don't allow bundles to be submitted over 1 hour into the future // TODO: make the window configurable let valid_timestamp_window = SystemTime::now() @@ -505,7 +505,7 @@ mod tests { ..Default::default() }; assert_eq!( - validate_bundle(&bundle, 0).await, + validate_bundle(&bundle, 0), Err(EthApiError::InvalidParams( "Bundle cannot be more than 1 hour in the future".into() ) @@ -555,7 +555,7 @@ mod tests { }; // Test should fail due to exceeding gas limit - let result = validate_bundle(&bundle, total_gas).await; + let result = validate_bundle(&bundle, total_gas); assert!(result.is_err()); if let Err(e) = result { let error_message = format!("{e:?}"); From 884e5c692816b27bf535052d6fa9b34614842d60 Mon Sep 17 00:00:00 2001 From: William Law Date: Thu, 23 Oct 2025 15:16:24 -0400 Subject: [PATCH 9/9] combine publish/enqueue fns --- crates/ingress-rpc/src/queue.rs | 20 ++++++-------------- 1 file changed, 6 insertions(+), 14 deletions(-) diff --git a/crates/ingress-rpc/src/queue.rs b/crates/ingress-rpc/src/queue.rs index ddd3baa7..94654bb1 100644 --- a/crates/ingress-rpc/src/queue.rs +++ b/crates/ingress-rpc/src/queue.rs @@ -1,6 +1,6 @@ use alloy_primitives::B256; use alloy_rpc_types_mev::EthSendBundle; -use anyhow::{Error, Result}; +use anyhow::Result; use async_trait::async_trait; use backon::{ExponentialBuilder, Retryable}; use rdkafka::producer::{FutureProducer, FutureRecord}; @@ -23,12 +23,11 @@ impl KafkaQueuePublisher { pub fn new(producer: FutureProducer, topic: String) -> Self { Self { producer, topic } } +} - pub async fn enqueue_bundle( - &self, - bundle: &EthSendBundle, - bundle_hash: &B256, - ) -> Result<(), Error> { +#[async_trait] +impl QueuePublisher for KafkaQueuePublisher { + async fn publish(&self, bundle: &EthSendBundle, bundle_hash: &B256) -> Result<()> { let key = bundle_hash.to_string(); let payload = serde_json::to_vec(bundle)?; @@ -72,13 +71,6 @@ impl KafkaQueuePublisher { } } -#[async_trait] -impl QueuePublisher for KafkaQueuePublisher { - async fn publish(&self, bundle: &EthSendBundle, bundle_hash: &B256) -> Result<()> { - self.enqueue_bundle(bundle, bundle_hash).await - } -} - #[cfg(test)] mod tests { use super::*; @@ -103,7 +95,7 @@ mod tests { let bundle_hash = bundle.bundle_hash(); let start = Instant::now(); - let result = publisher.enqueue_bundle(&bundle, &bundle_hash).await; + let result = publisher.publish(&bundle, &bundle_hash).await; let elapsed = start.elapsed(); // the backoff tries at minimum 100ms, so verify we tried at least once