Skip to content
Merged
Show file tree
Hide file tree
Changes from 8 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
20 changes: 10 additions & 10 deletions crates/ingress-rpc/src/queue.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use alloy_primitives::Address;
use alloy_primitives::B256;
use alloy_rpc_types_mev::EthSendBundle;
use anyhow::{Error, Result};
use async_trait::async_trait;
Expand All @@ -10,7 +10,7 @@ use tracing::{error, info};
/// A queue to buffer transactions
#[async_trait]
pub trait QueuePublisher: Send + Sync {
async fn publish(&self, bundle: &EthSendBundle, sender: Address) -> Result<()>;
async fn publish(&self, bundle: &EthSendBundle, bundle_hash: &B256) -> Result<()>;
}

/// A queue to buffer transactions
Expand All @@ -27,9 +27,9 @@ impl KafkaQueuePublisher {
pub async fn enqueue_bundle(
&self,
bundle: &EthSendBundle,
sender: Address,
bundle_hash: &B256,
) -> Result<(), Error> {
let key = sender.to_string();
let key = bundle_hash.to_string();
let payload = serde_json::to_vec(bundle)?;

let enqueue = || async {
Expand All @@ -38,7 +38,7 @@ impl KafkaQueuePublisher {
match self.producer.send(record, Duration::from_secs(5)).await {
Ok((partition, offset)) => {
info!(
sender = %sender,
bundle_hash = %bundle_hash,
partition = partition,
offset = offset,
topic = %self.topic,
Expand All @@ -48,7 +48,7 @@ impl KafkaQueuePublisher {
}
Err((err, _)) => {
error!(
sender = %sender,
bundle_hash = %bundle_hash,
error = %err,
topic = %self.topic,
"Failed to enqueue bundle"
Expand All @@ -74,8 +74,8 @@ impl KafkaQueuePublisher {

#[async_trait]
impl QueuePublisher for KafkaQueuePublisher {
async fn publish(&self, bundle: &EthSendBundle, sender: Address) -> Result<()> {
self.enqueue_bundle(bundle, sender).await
async fn publish(&self, bundle: &EthSendBundle, bundle_hash: &B256) -> Result<()> {
self.enqueue_bundle(bundle, bundle_hash).await
}
}

Expand All @@ -100,10 +100,10 @@ mod tests {

let publisher = KafkaQueuePublisher::new(producer, "tips-ingress-rpc".to_string());
let bundle = create_test_bundle();
let sender = Address::ZERO;
let bundle_hash = bundle.bundle_hash();

let start = Instant::now();
let result = publisher.enqueue_bundle(&bundle, sender).await;
let result = publisher.enqueue_bundle(&bundle, &bundle_hash).await;
let elapsed = start.elapsed();

// the backoff tries at minimum 100ms, so verify we tried at least once
Expand Down
98 changes: 70 additions & 28 deletions crates/ingress-rpc/src/service.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
use crate::validation::{AccountInfoLookup, L1BlockInfoLookup, validate_tx};
use alloy_consensus::transaction::SignerRecoverable;
use alloy_consensus::transaction::Recovered;
use alloy_consensus::{Transaction, transaction::SignerRecoverable};
use alloy_primitives::{B256, Bytes};
use alloy_provider::{Provider, RootProvider, network::eip2718::Decodable2718};
use alloy_rpc_types_mev::{EthBundleHash, EthCancelBundle, EthSendBundle};
Expand All @@ -14,6 +14,7 @@ use std::time::{SystemTime, UNIX_EPOCH};
use tracing::{info, warn};

use crate::queue::QueuePublisher;
use crate::validation::{AccountInfoLookup, L1BlockInfoLookup, validate_bundle, validate_tx};

#[rpc(server, namespace = "eth")]
pub trait IngressApi {
Expand Down Expand Up @@ -58,12 +59,25 @@ impl<Queue> IngressApiServer for IngressService<Queue>
where
Queue: QueuePublisher + Sync + Send + 'static,
{
async fn send_bundle(&self, _bundle: EthSendBundle) -> RpcResult<EthBundleHash> {
warn!(
message = "TODO: implement send_bundle",
method = "send_bundle"
async fn send_bundle(&self, bundle: EthSendBundle) -> RpcResult<EthBundleHash> {
self.validate_bundle(&bundle).await?;

// Queue the bundle
let bundle_hash = bundle.bundle_hash();
if let Err(e) = self.queue.publish(&bundle, &bundle_hash).await {
warn!(message = "Failed to publish bundle to queue", bundle_hash = %bundle_hash, error = %e);
return Err(EthApiError::InvalidParams("Failed to queue bundle".into()).into_rpc_err());
}

info!(
message = "queued bundle",
bundle_hash = %bundle_hash,
tx_count = bundle.txs.len(),
);
todo!("implement send_bundle")

Ok(EthBundleHash {
bundle_hash: bundle.bundle_hash(),
})
}

async fn cancel_bundle(&self, _request: EthCancelBundle) -> RpcResult<()> {
Expand All @@ -75,24 +89,7 @@ where
}

async fn send_raw_transaction(&self, data: Bytes) -> RpcResult<B256> {
if data.is_empty() {
return Err(EthApiError::EmptyRawTransactionData.into_rpc_err());
}

let envelope = OpTxEnvelope::decode_2718_exact(data.iter().as_slice())
.map_err(|_| EthApiError::FailedToDecodeSignedTransaction.into_rpc_err())?;

let transaction = envelope
.clone()
.try_into_recovered()
.map_err(|_| EthApiError::FailedToDecodeSignedTransaction.into_rpc_err())?;

let mut l1_block_info = self.provider.fetch_l1_block_info().await?;
let account = self
.provider
.fetch_account_info(transaction.signer())
.await?;
validate_tx(account, &transaction, &data, &mut l1_block_info).await?;
let transaction = self.validate_tx(&data).await?;

let expiry_timestamp = SystemTime::now()
.duration_since(UNIX_EPOCH)
Expand All @@ -110,9 +107,9 @@ where
};

// queue the bundle
let sender = transaction.signer();
if let Err(e) = self.queue.publish(&bundle, sender).await {
warn!(message = "Failed to publish Queue::enqueue_bundle", sender = %sender, error = %e);
let bundle_hash = bundle.bundle_hash();
if let Err(e) = self.queue.publish(&bundle, &bundle_hash).await {
warn!(message = "Failed to publish Queue::enqueue_bundle", bundle_hash = %bundle_hash, error = %e);
}

info!(message="queued singleton bundle", txn_hash=%transaction.tx_hash());
Expand All @@ -139,3 +136,48 @@ where
Ok(transaction.tx_hash())
}
}

impl<Queue> IngressService<Queue>
where
Queue: QueuePublisher + Sync + Send + 'static,
{
async fn validate_tx(&self, data: &Bytes) -> RpcResult<Recovered<OpTxEnvelope>> {
if data.is_empty() {
return Err(EthApiError::EmptyRawTransactionData.into_rpc_err());
}

let envelope = OpTxEnvelope::decode_2718_exact(data.iter().as_slice())
.map_err(|_| EthApiError::FailedToDecodeSignedTransaction.into_rpc_err())?;

let transaction = envelope
.clone()
.try_into_recovered()
.map_err(|_| EthApiError::FailedToDecodeSignedTransaction.into_rpc_err())?;

let mut l1_block_info = self.provider.fetch_l1_block_info().await?;
let account = self
.provider
.fetch_account_info(transaction.signer())
.await?;
validate_tx(account, &transaction, data, &mut l1_block_info).await?;

Ok(transaction)
}

async fn validate_bundle(&self, bundle: &EthSendBundle) -> RpcResult<()> {
if bundle.txs.is_empty() {
return Err(
EthApiError::InvalidParams("Bundle cannot have empty transactions".into())
.into_rpc_err(),
);
}

let mut total_gas = 0u64;
for tx_data in &bundle.txs {
let transaction = self.validate_tx(tx_data).await?;
total_gas = total_gas.saturating_add(transaction.gas_limit());
}

validate_bundle(bundle, total_gas)
}
}
110 changes: 110 additions & 0 deletions crates/ingress-rpc/src/validation.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,15 +2,20 @@ use alloy_consensus::private::alloy_eips::{BlockId, BlockNumberOrTag};
use alloy_consensus::{Transaction, Typed2718, constants::KECCAK_EMPTY, transaction::Recovered};
use alloy_primitives::{Address, B256, U256};
use alloy_provider::{Provider, RootProvider};
use alloy_rpc_types_mev::EthSendBundle;
use async_trait::async_trait;
use jsonrpsee::core::RpcResult;
use op_alloy_consensus::interop::CROSS_L2_INBOX_ADDRESS;
use op_alloy_network::Optimism;
use op_revm::{OpSpecId, l1block::L1BlockInfo};
use reth_optimism_evm::extract_l1_info_from_tx;
use reth_rpc_eth_types::{EthApiError, RpcInvalidTransactionError, SignError};
use std::time::{Duration, SystemTime, UNIX_EPOCH};
use tracing::warn;

// TODO: make this configurable
const MAX_BUNDLE_GAS: u64 = 30_000_000;

/// Account info for a given address
pub struct AccountInfo {
pub balance: U256,
Expand Down Expand Up @@ -158,17 +163,51 @@ pub async fn validate_tx<T: Transaction>(
Ok(())
}

/// Helper function to validate propeties of a bundle. A bundle is valid if it satisfies the following criteria:
/// - The bundle's max_timestamp is not more than 1 hour in the future
/// - The bundle's gas limit is not greater than the maximum allowed gas limit
pub fn validate_bundle(bundle: &EthSendBundle, bundle_gas: u64) -> RpcResult<()> {
// Don't allow bundles to be submitted over 1 hour into the future
// TODO: make the window configurable
let valid_timestamp_window = SystemTime::now()
.duration_since(UNIX_EPOCH)
.unwrap()
.as_secs()
+ Duration::from_secs(3600).as_secs();
if let Some(max_timestamp) = bundle.max_timestamp
&& max_timestamp > valid_timestamp_window
{
return Err(EthApiError::InvalidParams(
"Bundle cannot be more than 1 hour in the future".into(),
)
.into_rpc_err());
}

// Check max gas limit for the entire bundle
if bundle_gas > MAX_BUNDLE_GAS {
return Err(
EthApiError::InvalidParams("Bundle gas limit exceeds maximum allowed".into())
.into_rpc_err(),
);
}

Ok(())
}

#[cfg(test)]
mod tests {
use super::*;
use alloy_consensus::SignableTransaction;
use alloy_consensus::{Transaction, constants::KECCAK_EMPTY, transaction::SignerRecoverable};
use alloy_consensus::{TxEip1559, TxEip4844, TxEip7702};
use alloy_primitives::Bytes;
use alloy_primitives::{bytes, keccak256};
use alloy_signer_local::PrivateKeySigner;
use op_alloy_consensus::OpTxEnvelope;
use op_alloy_network::TxSignerSync;
use op_alloy_network::eip2718::Encodable2718;
use revm_context_interface::transaction::{AccessList, AccessListItem};
use std::time::{SystemTime, UNIX_EPOCH};

fn create_account(nonce: u64, balance: U256) -> AccountInfo {
AccountInfo {
Expand Down Expand Up @@ -452,4 +491,75 @@ mod tests {
.into_rpc_err())
);
}

#[tokio::test]
async fn test_err_bundle_max_timestamp_too_far_in_the_future() {
let current_time = SystemTime::now()
.duration_since(UNIX_EPOCH)
.unwrap()
.as_secs();
let too_far_in_the_future = current_time + 3601;
let bundle = EthSendBundle {
txs: vec![],
max_timestamp: Some(too_far_in_the_future),
..Default::default()
};
assert_eq!(
validate_bundle(&bundle, 0),
Err(EthApiError::InvalidParams(
"Bundle cannot be more than 1 hour in the future".into()
)
.into_rpc_err())
);
}

#[tokio::test]
async fn test_err_bundle_max_gas_limit_too_high() {
let signer = PrivateKeySigner::random();
let mut encoded_txs = vec![];

// Create transactions that collectively exceed MAX_BUNDLE_GAS (30M)
// Each transaction uses 4M gas, so 8 transactions = 32M gas > 30M limit
let gas = 4_000_000;
let mut total_gas = 0u64;
for _ in 0..8 {
let mut tx = TxEip1559 {
chain_id: 1,
nonce: 0,
gas_limit: gas,
max_fee_per_gas: 200000u128,
max_priority_fee_per_gas: 100000u128,
to: Address::random().into(),
value: U256::from(1000000u128),
access_list: Default::default(),
input: bytes!("").clone(),
};
total_gas = total_gas.saturating_add(gas);

let signature = signer.sign_transaction_sync(&mut tx).unwrap();
let envelope = OpTxEnvelope::Eip1559(tx.into_signed(signature));

// Encode the transaction
let mut encoded = vec![];
envelope.encode_2718(&mut encoded);
encoded_txs.push(Bytes::from(encoded));
}

let bundle = EthSendBundle {
txs: encoded_txs,
block_number: 0,
min_timestamp: None,
max_timestamp: None,
reverting_tx_hashes: vec![],
..Default::default()
};

// Test should fail due to exceeding gas limit
let result = validate_bundle(&bundle, total_gas);
assert!(result.is_err());
if let Err(e) = result {
let error_message = format!("{e:?}");
assert!(error_message.contains("Bundle gas limit exceeds maximum allowed"));
}
}
}