Skip to content

Commit 5b88139

Browse files
authored
feat: introduce eth_sendBundle (#34)
* spike * use bundle hash as key * validate txs in bundle * refactor validate_tx * refactor validate_bundle * add unit tests * make clippy happy * crate::validate_bundle doesnt need async * combine publish/enqueue fns
1 parent edfa362 commit 5b88139

File tree

3 files changed

+192
-48
lines changed

3 files changed

+192
-48
lines changed

crates/ingress-rpc/src/queue.rs

Lines changed: 12 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
1-
use alloy_primitives::Address;
1+
use alloy_primitives::B256;
22
use alloy_rpc_types_mev::EthSendBundle;
3-
use anyhow::{Error, Result};
3+
use anyhow::Result;
44
use async_trait::async_trait;
55
use backon::{ExponentialBuilder, Retryable};
66
use rdkafka::producer::{FutureProducer, FutureRecord};
@@ -10,7 +10,7 @@ use tracing::{error, info};
1010
/// A queue to buffer transactions
1111
#[async_trait]
1212
pub trait QueuePublisher: Send + Sync {
13-
async fn publish(&self, bundle: &EthSendBundle, sender: Address) -> Result<()>;
13+
async fn publish(&self, bundle: &EthSendBundle, bundle_hash: &B256) -> Result<()>;
1414
}
1515

1616
/// A queue to buffer transactions
@@ -23,13 +23,12 @@ impl KafkaQueuePublisher {
2323
pub fn new(producer: FutureProducer, topic: String) -> Self {
2424
Self { producer, topic }
2525
}
26+
}
2627

27-
pub async fn enqueue_bundle(
28-
&self,
29-
bundle: &EthSendBundle,
30-
sender: Address,
31-
) -> Result<(), Error> {
32-
let key = sender.to_string();
28+
#[async_trait]
29+
impl QueuePublisher for KafkaQueuePublisher {
30+
async fn publish(&self, bundle: &EthSendBundle, bundle_hash: &B256) -> Result<()> {
31+
let key = bundle_hash.to_string();
3332
let payload = serde_json::to_vec(bundle)?;
3433

3534
let enqueue = || async {
@@ -38,7 +37,7 @@ impl KafkaQueuePublisher {
3837
match self.producer.send(record, Duration::from_secs(5)).await {
3938
Ok((partition, offset)) => {
4039
info!(
41-
sender = %sender,
40+
bundle_hash = %bundle_hash,
4241
partition = partition,
4342
offset = offset,
4443
topic = %self.topic,
@@ -48,7 +47,7 @@ impl KafkaQueuePublisher {
4847
}
4948
Err((err, _)) => {
5049
error!(
51-
sender = %sender,
50+
bundle_hash = %bundle_hash,
5251
error = %err,
5352
topic = %self.topic,
5453
"Failed to enqueue bundle"
@@ -72,13 +71,6 @@ impl KafkaQueuePublisher {
7271
}
7372
}
7473

75-
#[async_trait]
76-
impl QueuePublisher for KafkaQueuePublisher {
77-
async fn publish(&self, bundle: &EthSendBundle, sender: Address) -> Result<()> {
78-
self.enqueue_bundle(bundle, sender).await
79-
}
80-
}
81-
8274
#[cfg(test)]
8375
mod tests {
8476
use super::*;
@@ -100,10 +92,10 @@ mod tests {
10092

10193
let publisher = KafkaQueuePublisher::new(producer, "tips-ingress-rpc".to_string());
10294
let bundle = create_test_bundle();
103-
let sender = Address::ZERO;
95+
let bundle_hash = bundle.bundle_hash();
10496

10597
let start = Instant::now();
106-
let result = publisher.enqueue_bundle(&bundle, sender).await;
98+
let result = publisher.publish(&bundle, &bundle_hash).await;
10799
let elapsed = start.elapsed();
108100

109101
// the backoff tries at minimum 100ms, so verify we tried at least once

crates/ingress-rpc/src/service.rs

Lines changed: 70 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
1-
use crate::validation::{AccountInfoLookup, L1BlockInfoLookup, validate_tx};
2-
use alloy_consensus::transaction::SignerRecoverable;
1+
use alloy_consensus::transaction::Recovered;
2+
use alloy_consensus::{Transaction, transaction::SignerRecoverable};
33
use alloy_primitives::{B256, Bytes};
44
use alloy_provider::{Provider, RootProvider, network::eip2718::Decodable2718};
55
use alloy_rpc_types_mev::{EthBundleHash, EthCancelBundle, EthSendBundle};
@@ -14,6 +14,7 @@ use std::time::{SystemTime, UNIX_EPOCH};
1414
use tracing::{info, warn};
1515

1616
use crate::queue::QueuePublisher;
17+
use crate::validation::{AccountInfoLookup, L1BlockInfoLookup, validate_bundle, validate_tx};
1718

1819
#[rpc(server, namespace = "eth")]
1920
pub trait IngressApi {
@@ -58,12 +59,25 @@ impl<Queue> IngressApiServer for IngressService<Queue>
5859
where
5960
Queue: QueuePublisher + Sync + Send + 'static,
6061
{
61-
async fn send_bundle(&self, _bundle: EthSendBundle) -> RpcResult<EthBundleHash> {
62-
warn!(
63-
message = "TODO: implement send_bundle",
64-
method = "send_bundle"
62+
async fn send_bundle(&self, bundle: EthSendBundle) -> RpcResult<EthBundleHash> {
63+
self.validate_bundle(&bundle).await?;
64+
65+
// Queue the bundle
66+
let bundle_hash = bundle.bundle_hash();
67+
if let Err(e) = self.queue.publish(&bundle, &bundle_hash).await {
68+
warn!(message = "Failed to publish bundle to queue", bundle_hash = %bundle_hash, error = %e);
69+
return Err(EthApiError::InvalidParams("Failed to queue bundle".into()).into_rpc_err());
70+
}
71+
72+
info!(
73+
message = "queued bundle",
74+
bundle_hash = %bundle_hash,
75+
tx_count = bundle.txs.len(),
6576
);
66-
todo!("implement send_bundle")
77+
78+
Ok(EthBundleHash {
79+
bundle_hash: bundle.bundle_hash(),
80+
})
6781
}
6882

6983
async fn cancel_bundle(&self, _request: EthCancelBundle) -> RpcResult<()> {
@@ -75,24 +89,7 @@ where
7589
}
7690

7791
async fn send_raw_transaction(&self, data: Bytes) -> RpcResult<B256> {
78-
if data.is_empty() {
79-
return Err(EthApiError::EmptyRawTransactionData.into_rpc_err());
80-
}
81-
82-
let envelope = OpTxEnvelope::decode_2718_exact(data.iter().as_slice())
83-
.map_err(|_| EthApiError::FailedToDecodeSignedTransaction.into_rpc_err())?;
84-
85-
let transaction = envelope
86-
.clone()
87-
.try_into_recovered()
88-
.map_err(|_| EthApiError::FailedToDecodeSignedTransaction.into_rpc_err())?;
89-
90-
let mut l1_block_info = self.provider.fetch_l1_block_info().await?;
91-
let account = self
92-
.provider
93-
.fetch_account_info(transaction.signer())
94-
.await?;
95-
validate_tx(account, &transaction, &data, &mut l1_block_info).await?;
92+
let transaction = self.validate_tx(&data).await?;
9693

9794
let expiry_timestamp = SystemTime::now()
9895
.duration_since(UNIX_EPOCH)
@@ -110,9 +107,9 @@ where
110107
};
111108

112109
// queue the bundle
113-
let sender = transaction.signer();
114-
if let Err(e) = self.queue.publish(&bundle, sender).await {
115-
warn!(message = "Failed to publish Queue::enqueue_bundle", sender = %sender, error = %e);
110+
let bundle_hash = bundle.bundle_hash();
111+
if let Err(e) = self.queue.publish(&bundle, &bundle_hash).await {
112+
warn!(message = "Failed to publish Queue::enqueue_bundle", bundle_hash = %bundle_hash, error = %e);
116113
}
117114

118115
info!(message="queued singleton bundle", txn_hash=%transaction.tx_hash());
@@ -139,3 +136,48 @@ where
139136
Ok(transaction.tx_hash())
140137
}
141138
}
139+
140+
impl<Queue> IngressService<Queue>
141+
where
142+
Queue: QueuePublisher + Sync + Send + 'static,
143+
{
144+
async fn validate_tx(&self, data: &Bytes) -> RpcResult<Recovered<OpTxEnvelope>> {
145+
if data.is_empty() {
146+
return Err(EthApiError::EmptyRawTransactionData.into_rpc_err());
147+
}
148+
149+
let envelope = OpTxEnvelope::decode_2718_exact(data.iter().as_slice())
150+
.map_err(|_| EthApiError::FailedToDecodeSignedTransaction.into_rpc_err())?;
151+
152+
let transaction = envelope
153+
.clone()
154+
.try_into_recovered()
155+
.map_err(|_| EthApiError::FailedToDecodeSignedTransaction.into_rpc_err())?;
156+
157+
let mut l1_block_info = self.provider.fetch_l1_block_info().await?;
158+
let account = self
159+
.provider
160+
.fetch_account_info(transaction.signer())
161+
.await?;
162+
validate_tx(account, &transaction, data, &mut l1_block_info).await?;
163+
164+
Ok(transaction)
165+
}
166+
167+
async fn validate_bundle(&self, bundle: &EthSendBundle) -> RpcResult<()> {
168+
if bundle.txs.is_empty() {
169+
return Err(
170+
EthApiError::InvalidParams("Bundle cannot have empty transactions".into())
171+
.into_rpc_err(),
172+
);
173+
}
174+
175+
let mut total_gas = 0u64;
176+
for tx_data in &bundle.txs {
177+
let transaction = self.validate_tx(tx_data).await?;
178+
total_gas = total_gas.saturating_add(transaction.gas_limit());
179+
}
180+
181+
validate_bundle(bundle, total_gas)
182+
}
183+
}

crates/ingress-rpc/src/validation.rs

Lines changed: 110 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,15 +2,20 @@ use alloy_consensus::private::alloy_eips::{BlockId, BlockNumberOrTag};
22
use alloy_consensus::{Transaction, Typed2718, constants::KECCAK_EMPTY, transaction::Recovered};
33
use alloy_primitives::{Address, B256, U256};
44
use alloy_provider::{Provider, RootProvider};
5+
use alloy_rpc_types_mev::EthSendBundle;
56
use async_trait::async_trait;
67
use jsonrpsee::core::RpcResult;
78
use op_alloy_consensus::interop::CROSS_L2_INBOX_ADDRESS;
89
use op_alloy_network::Optimism;
910
use op_revm::{OpSpecId, l1block::L1BlockInfo};
1011
use reth_optimism_evm::extract_l1_info_from_tx;
1112
use reth_rpc_eth_types::{EthApiError, RpcInvalidTransactionError, SignError};
13+
use std::time::{Duration, SystemTime, UNIX_EPOCH};
1214
use tracing::warn;
1315

16+
// TODO: make this configurable
17+
const MAX_BUNDLE_GAS: u64 = 30_000_000;
18+
1419
/// Account info for a given address
1520
pub struct AccountInfo {
1621
pub balance: U256,
@@ -158,17 +163,51 @@ pub async fn validate_tx<T: Transaction>(
158163
Ok(())
159164
}
160165

166+
/// Helper function to validate propeties of a bundle. A bundle is valid if it satisfies the following criteria:
167+
/// - The bundle's max_timestamp is not more than 1 hour in the future
168+
/// - The bundle's gas limit is not greater than the maximum allowed gas limit
169+
pub fn validate_bundle(bundle: &EthSendBundle, bundle_gas: u64) -> RpcResult<()> {
170+
// Don't allow bundles to be submitted over 1 hour into the future
171+
// TODO: make the window configurable
172+
let valid_timestamp_window = SystemTime::now()
173+
.duration_since(UNIX_EPOCH)
174+
.unwrap()
175+
.as_secs()
176+
+ Duration::from_secs(3600).as_secs();
177+
if let Some(max_timestamp) = bundle.max_timestamp
178+
&& max_timestamp > valid_timestamp_window
179+
{
180+
return Err(EthApiError::InvalidParams(
181+
"Bundle cannot be more than 1 hour in the future".into(),
182+
)
183+
.into_rpc_err());
184+
}
185+
186+
// Check max gas limit for the entire bundle
187+
if bundle_gas > MAX_BUNDLE_GAS {
188+
return Err(
189+
EthApiError::InvalidParams("Bundle gas limit exceeds maximum allowed".into())
190+
.into_rpc_err(),
191+
);
192+
}
193+
194+
Ok(())
195+
}
196+
161197
#[cfg(test)]
162198
mod tests {
163199
use super::*;
164200
use alloy_consensus::SignableTransaction;
165201
use alloy_consensus::{Transaction, constants::KECCAK_EMPTY, transaction::SignerRecoverable};
166202
use alloy_consensus::{TxEip1559, TxEip4844, TxEip7702};
203+
use alloy_primitives::Bytes;
167204
use alloy_primitives::{bytes, keccak256};
168205
use alloy_signer_local::PrivateKeySigner;
169206
use op_alloy_consensus::OpTxEnvelope;
170207
use op_alloy_network::TxSignerSync;
208+
use op_alloy_network::eip2718::Encodable2718;
171209
use revm_context_interface::transaction::{AccessList, AccessListItem};
210+
use std::time::{SystemTime, UNIX_EPOCH};
172211

173212
fn create_account(nonce: u64, balance: U256) -> AccountInfo {
174213
AccountInfo {
@@ -452,4 +491,75 @@ mod tests {
452491
.into_rpc_err())
453492
);
454493
}
494+
495+
#[tokio::test]
496+
async fn test_err_bundle_max_timestamp_too_far_in_the_future() {
497+
let current_time = SystemTime::now()
498+
.duration_since(UNIX_EPOCH)
499+
.unwrap()
500+
.as_secs();
501+
let too_far_in_the_future = current_time + 3601;
502+
let bundle = EthSendBundle {
503+
txs: vec![],
504+
max_timestamp: Some(too_far_in_the_future),
505+
..Default::default()
506+
};
507+
assert_eq!(
508+
validate_bundle(&bundle, 0),
509+
Err(EthApiError::InvalidParams(
510+
"Bundle cannot be more than 1 hour in the future".into()
511+
)
512+
.into_rpc_err())
513+
);
514+
}
515+
516+
#[tokio::test]
517+
async fn test_err_bundle_max_gas_limit_too_high() {
518+
let signer = PrivateKeySigner::random();
519+
let mut encoded_txs = vec![];
520+
521+
// Create transactions that collectively exceed MAX_BUNDLE_GAS (30M)
522+
// Each transaction uses 4M gas, so 8 transactions = 32M gas > 30M limit
523+
let gas = 4_000_000;
524+
let mut total_gas = 0u64;
525+
for _ in 0..8 {
526+
let mut tx = TxEip1559 {
527+
chain_id: 1,
528+
nonce: 0,
529+
gas_limit: gas,
530+
max_fee_per_gas: 200000u128,
531+
max_priority_fee_per_gas: 100000u128,
532+
to: Address::random().into(),
533+
value: U256::from(1000000u128),
534+
access_list: Default::default(),
535+
input: bytes!("").clone(),
536+
};
537+
total_gas = total_gas.saturating_add(gas);
538+
539+
let signature = signer.sign_transaction_sync(&mut tx).unwrap();
540+
let envelope = OpTxEnvelope::Eip1559(tx.into_signed(signature));
541+
542+
// Encode the transaction
543+
let mut encoded = vec![];
544+
envelope.encode_2718(&mut encoded);
545+
encoded_txs.push(Bytes::from(encoded));
546+
}
547+
548+
let bundle = EthSendBundle {
549+
txs: encoded_txs,
550+
block_number: 0,
551+
min_timestamp: None,
552+
max_timestamp: None,
553+
reverting_tx_hashes: vec![],
554+
..Default::default()
555+
};
556+
557+
// Test should fail due to exceeding gas limit
558+
let result = validate_bundle(&bundle, total_gas);
559+
assert!(result.is_err());
560+
if let Err(e) = result {
561+
let error_message = format!("{e:?}");
562+
assert!(error_message.contains("Bundle gas limit exceeds maximum allowed"));
563+
}
564+
}
455565
}

0 commit comments

Comments
 (0)