From 129b6f8be997af68db3fde5ae353a734adc31ed7 Mon Sep 17 00:00:00 2001 From: Thomas Coratger <60488569+tcoratger@users.noreply.github.com> Date: Mon, 28 Oct 2024 18:01:48 +0100 Subject: [PATCH] mempool: modify `maintain_transaction_pool` to prune txs (#1485) * mempool: modify maintain_transaction_pool to prune txs * clean up * update pruning time * fix comment * fix comment * fix prune time check * fix comments * fix conflicts * fix * fix * fix --- src/main.rs | 10 +- src/pool/constants.rs | 5 + src/pool/mempool.rs | 46 ++++++- src/pool/mod.rs | 2 +- src/providers/eth_provider/database/filter.rs | 2 +- tests/tests/mempool.rs | 128 +++++++++++++++++- 6 files changed, 178 insertions(+), 15 deletions(-) diff --git a/src/main.rs b/src/main.rs index 888acb7ae..f5342c21a 100644 --- a/src/main.rs +++ b/src/main.rs @@ -1,12 +1,13 @@ -use std::{env::var, str::FromStr, sync::Arc}; - use dotenvy::dotenv; use eyre::Result; use kakarot_rpc::{ client::EthClient, constants::{KAKAROT_RPC_CONFIG, RPC_CONFIG}, eth_rpc::{rpc::KakarotRpcModuleBuilder, run_server}, - pool::mempool::{maintain_transaction_pool, AccountManager}, + pool::{ + constants::PRUNE_DURATION, + mempool::{maintain_transaction_pool, AccountManager}, + }, providers::eth_provider::{ database::Database, starknet::kakarot_core::{core::KakarotCoreReader, KAKAROT_ADDRESS}, @@ -19,6 +20,7 @@ use starknet::{ core::types::{BlockId, BlockTag, Felt}, providers::{jsonrpc::HttpTransport, JsonRpcClient}, }; +use std::{env::var, str::FromStr, sync::Arc}; use tracing_opentelemetry::MetricsLayer; use tracing_subscriber::{layer::SubscriberExt, util::SubscriberInitExt, EnvFilter, Layer}; @@ -64,7 +66,7 @@ async fn main() -> Result<()> { AccountManager::from_addresses(addresses, Arc::clone(ð_client)).await?.start(); // Start the maintenance of the mempool - maintain_transaction_pool(Arc::clone(ð_client)); + maintain_transaction_pool(Arc::clone(ð_client), PRUNE_DURATION); // Setup the RPC module let kakarot_rpc_module = KakarotRpcModuleBuilder::new(eth_client).rpc_module()?; diff --git a/src/pool/constants.rs b/src/pool/constants.rs index 752ce8384..9435f6a84 100644 --- a/src/pool/constants.rs +++ b/src/pool/constants.rs @@ -1 +1,6 @@ +use std::time::Duration; + pub(super) static ONE_TENTH_ETH: u64 = 10u64.pow(17); + +// Transactions should be pruned after 5 minutes in the mempool +pub const PRUNE_DURATION: Duration = Duration::from_secs(300); diff --git a/src/pool/mempool.rs b/src/pool/mempool.rs index 04528d6e2..468fe7d0a 100644 --- a/src/pool/mempool.rs +++ b/src/pool/mempool.rs @@ -24,7 +24,7 @@ use starknet::{ providers::{jsonrpc::HttpTransport, JsonRpcClient, ProviderRequestData, ProviderResponseData}, }; use std::{collections::HashMap, sync::Arc, time::Duration}; -use tokio::sync::Mutex; +use tokio::{sync::Mutex, time::Instant}; use tracing::instrument; /// A type alias for the Kakarot Transaction Validator. @@ -282,13 +282,27 @@ where /// Maintains the transaction pool by periodically polling the database in order to /// fetch the latest block and mark the block's transactions as mined by the node. -pub fn maintain_transaction_pool(eth_client: Arc>) +pub fn maintain_transaction_pool(eth_client: Arc>, prune_duration: Duration) where SP: starknet::providers::Provider + Send + Sync + Clone + 'static, { tokio::spawn(async move { let mut block_number = 0u64; + + // Mapping to store the transactions in the mempool with a timestamp to potentially prune them + let mut mempool_transactions = HashMap::new(); + loop { + // Adding the transactions to the mempool mapping with a timestamp + for tx in eth_client + .mempool() + .queued_transactions() + .into_iter() + .chain(eth_client.mempool().pending_transactions()) + { + mempool_transactions.entry(*tx.hash()).or_insert_with(Instant::now); + } + // Fetch the latest block number let Ok(current_block_number) = eth_client.eth_provider().block_number().await else { tracing::error!(target: "maintain_transaction_pool", "failed to fetch current block number"); @@ -319,7 +333,7 @@ where chain_spec.base_fee_params_at_timestamp(latest_header.timestamp + 12), ) .unwrap_or_default(), - pending_blob_fee: latest_header.next_block_blob_fee(), + pending_blob_fee: None, }; eth_client.mempool().set_block_info(info); @@ -338,7 +352,31 @@ where } let sealed_block = latest_block.seal(hash); - let mined_transactions = sealed_block.body.transactions.iter().map(|tx| tx.hash).collect(); + let mut mined_transactions: Vec<_> = + sealed_block.body.transactions.iter().map(|tx| tx.hash).collect(); + + // Prune mined transactions from the mempool mapping + for tx_hash in &mined_transactions { + mempool_transactions.remove(tx_hash); + } + + // Prune transactions that have been in the mempool for more than 5 minutes + let now = Instant::now(); + + for (tx_hash, timestamp) in mempool_transactions.clone() { + // - If the transaction has been in the mempool for more than 5 minutes + // - And the transaction is in the mempool right now + if now.duration_since(timestamp) > prune_duration && eth_client.mempool().contains(&tx_hash) + { + tracing::warn!(target: "maintain_transaction_pool", ?tx_hash, "pruning"); + + // Add the transaction to the mined transactions so that it can be pruned + mined_transactions.push(tx_hash); + + // Remove the transaction from the mempool mapping + mempool_transactions.remove(&tx_hash); + } + } // Canonical update let update = CanonicalStateUpdate { diff --git a/src/pool/mod.rs b/src/pool/mod.rs index 252056409..a9ec7c4ce 100644 --- a/src/pool/mod.rs +++ b/src/pool/mod.rs @@ -1,3 +1,3 @@ -mod constants; +pub mod constants; pub mod mempool; pub mod validate; diff --git a/src/providers/eth_provider/database/filter.rs b/src/providers/eth_provider/database/filter.rs index c1826d884..467682ae2 100644 --- a/src/providers/eth_provider/database/filter.rs +++ b/src/providers/eth_provider/database/filter.rs @@ -256,7 +256,7 @@ impl EthDatabaseFilterBuilder { } } -pub(crate) fn format_hex(value: impl LowerHex, width: usize) -> String { +pub fn format_hex(value: impl LowerHex, width: usize) -> String { // Add 2 to the width to account for the 0x prefix. let s = format!("{:#0width$x}", value, width = width + 2); // `s.len() < width` can happen because of the LowerHex implementation diff --git a/tests/tests/mempool.rs b/tests/tests/mempool.rs index 1c85aadc8..6468ddab0 100644 --- a/tests/tests/mempool.rs +++ b/tests/tests/mempool.rs @@ -1,19 +1,35 @@ #![allow(clippy::used_underscore_binding)] #![cfg(feature = "testing")] -use alloy_consensus::TxEip1559; +use alloy_consensus::{TxEip1559, EMPTY_ROOT_HASH}; use alloy_eips::eip2718::Encodable2718; -use alloy_primitives::{Address, TxKind, U256}; +use alloy_primitives::{Address, TxKind, B64, U256}; +use alloy_rpc_types::Header; use kakarot_rpc::{ - providers::eth_provider::{error::SignatureError, ChainProvider}, + pool::mempool::maintain_transaction_pool, + providers::eth_provider::{ + constant::U64_HEX_STRING_LEN, + database::{ + filter::{self, format_hex, EthDatabaseFilterBuilder}, + types::header::StoredHeader, + }, + error::SignatureError, + ChainProvider, + }, test_utils::{ eoa::Eoa, - fixtures::{katana_empty, setup}, + fixtures::{katana, katana_empty, setup}, katana::Katana, }, }; +use mongodb::{ + bson::doc, + options::{UpdateModifications, UpdateOptions}, +}; use reth_primitives::{sign_message, Transaction, TransactionSigned, TransactionSignedEcRecovered}; -use reth_transaction_pool::{EthPooledTransaction, TransactionOrigin, TransactionPool}; +use reth_transaction_pool::{EthPooledTransaction, PoolTransaction, TransactionOrigin, TransactionPool}; +use revm_primitives::B256; use rstest::*; +use std::{sync::Arc, time::Duration}; #[rstest] #[awt] @@ -351,3 +367,105 @@ pub async fn create_sample_transactions( } Ok(transactions) } + +#[rstest] +#[awt] +#[tokio::test(flavor = "multi_thread")] +async fn test_maintain_mempool(#[future] katana: Katana, _setup: ()) { + let eth_client = Arc::new(katana.eth_client()); + + // Create two sample transactions at once + let transactions = create_sample_transactions(&katana, 2).await.expect("Failed to create sample transactions"); + + // Extract and ensure we have two valid transactions from the transaction list. + let ((transaction1, _), (transaction2, _)) = ( + transactions.first().expect("Expected at least one transaction").clone(), + transactions.get(1).expect("Expected at least two transactions").clone(), + ); + + // Add transactions to the mempool + eth_client.mempool().add_transaction(TransactionOrigin::Private, transaction1.clone()).await.unwrap(); + eth_client.mempool().add_transaction(TransactionOrigin::Private, transaction2.clone()).await.unwrap(); + + // Start maintaining the transaction pool + // + // This task will periodically prune the mempool based on the given prune_duration. + // For testing purposes, we set the prune_duration to 100 milliseconds. + let prune_duration = Duration::from_millis(100); + let eth_client_clone = Arc::clone(ð_client); + let maintain_task = tokio::spawn(async move { + maintain_transaction_pool(eth_client_clone, prune_duration); + }); + + // Initialize the block number based on the current blockchain state from katana. + let mut last_block_number = katana.block_number(); + + // Loop to simulate new blocks being added to the blockchain every 100 milliseconds. + for _ in 0..9 { + // Sleep for 10 milliseconds to simulate the passage of time between blocks. + tokio::time::sleep(Duration::from_millis(10)).await; + + // Increment the block number to simulate the blockchain progressing. + last_block_number += 1; + + // Format the block number in both padded and unpadded hexadecimal formats. + let unpadded_block_number = format_hex(last_block_number, 0); + let padded_block_number = format_hex(last_block_number, U64_HEX_STRING_LEN); + + // Get the block header collection from the database. + let header_collection = eth_client.eth_provider().database().collection::(); + + // Build a filter for updating the header based on the new block number. + let filter = EthDatabaseFilterBuilder::::default().with_block_number(last_block_number).build(); + + // Insert a new header for the new block number in the database. + eth_client + .eth_provider() + .database() + .update_one( + StoredHeader { + header: Header { + hash: B256::random(), + total_difficulty: Some(U256::default()), + mix_hash: Some(B256::default()), + nonce: Some(B64::default()), + withdrawals_root: Some(EMPTY_ROOT_HASH), + base_fee_per_gas: Some(0), + blob_gas_used: Some(0), + excess_blob_gas: Some(0), + number: last_block_number, + ..Default::default() + }, + }, + filter, + true, + ) + .await + .expect("Failed to update header in database"); + + // Update the header collection with the padded block number in the database. + header_collection + .update_one( + doc! {"header.number": unpadded_block_number}, + UpdateModifications::Document(doc! {"$set": {"header.number": padded_block_number}}), + ) + .with_options(UpdateOptions::builder().upsert(true).build()) + .await + .expect("Failed to update block number"); + + // Check if both transactions are still in the mempool. + // We expect them to still be in the mempool until 1 second has elapsed. + assert!(eth_client.mempool().contains(transaction1.hash()), "Transaction 1 should still be in the mempool"); + assert!(eth_client.mempool().contains(transaction2.hash()), "Transaction 2 should still be in the mempool"); + } + + // Sleep for some additional time to allow the pruning to occur. + tokio::time::sleep(Duration::from_millis(500)).await; + + // Verify that both transactions have been pruned from the mempool after the pruning duration. + assert!(!eth_client.mempool().contains(transaction1.hash()), "Transaction 1 should be pruned after 1 second"); + assert!(!eth_client.mempool().contains(transaction2.hash()), "Transaction 2 should be pruned after 1 second"); + + // Ensure the background task is stopped gracefully. + maintain_task.abort(); +}