Skip to content

Commit

Permalink
mempool: modify maintain_transaction_pool to prune txs (#1485)
Browse files Browse the repository at this point in the history
* mempool: modify maintain_transaction_pool to prune txs

* clean up

* update pruning time

* fix comment

* fix comment

* fix prune time check

* fix comments

* fix conflicts

* fix

* fix

* fix
  • Loading branch information
tcoratger authored Oct 28, 2024
1 parent 8889ae9 commit 129b6f8
Show file tree
Hide file tree
Showing 6 changed files with 178 additions and 15 deletions.
10 changes: 6 additions & 4 deletions src/main.rs
Original file line number Diff line number Diff line change
@@ -1,12 +1,13 @@
use std::{env::var, str::FromStr, sync::Arc};

use dotenvy::dotenv;
use eyre::Result;
use kakarot_rpc::{
client::EthClient,
constants::{KAKAROT_RPC_CONFIG, RPC_CONFIG},
eth_rpc::{rpc::KakarotRpcModuleBuilder, run_server},
pool::mempool::{maintain_transaction_pool, AccountManager},
pool::{
constants::PRUNE_DURATION,
mempool::{maintain_transaction_pool, AccountManager},
},
providers::eth_provider::{
database::Database,
starknet::kakarot_core::{core::KakarotCoreReader, KAKAROT_ADDRESS},
Expand All @@ -19,6 +20,7 @@ use starknet::{
core::types::{BlockId, BlockTag, Felt},
providers::{jsonrpc::HttpTransport, JsonRpcClient},
};
use std::{env::var, str::FromStr, sync::Arc};
use tracing_opentelemetry::MetricsLayer;
use tracing_subscriber::{layer::SubscriberExt, util::SubscriberInitExt, EnvFilter, Layer};

Expand Down Expand Up @@ -64,7 +66,7 @@ async fn main() -> Result<()> {
AccountManager::from_addresses(addresses, Arc::clone(&eth_client)).await?.start();

// Start the maintenance of the mempool
maintain_transaction_pool(Arc::clone(&eth_client));
maintain_transaction_pool(Arc::clone(&eth_client), PRUNE_DURATION);

// Setup the RPC module
let kakarot_rpc_module = KakarotRpcModuleBuilder::new(eth_client).rpc_module()?;
Expand Down
5 changes: 5 additions & 0 deletions src/pool/constants.rs
Original file line number Diff line number Diff line change
@@ -1 +1,6 @@
use std::time::Duration;

pub(super) static ONE_TENTH_ETH: u64 = 10u64.pow(17);

// Transactions should be pruned after 5 minutes in the mempool
pub const PRUNE_DURATION: Duration = Duration::from_secs(300);
46 changes: 42 additions & 4 deletions src/pool/mempool.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ use starknet::{
providers::{jsonrpc::HttpTransport, JsonRpcClient, ProviderRequestData, ProviderResponseData},
};
use std::{collections::HashMap, sync::Arc, time::Duration};
use tokio::sync::Mutex;
use tokio::{sync::Mutex, time::Instant};
use tracing::instrument;

/// A type alias for the Kakarot Transaction Validator.
Expand Down Expand Up @@ -282,13 +282,27 @@ where

/// Maintains the transaction pool by periodically polling the database in order to
/// fetch the latest block and mark the block's transactions as mined by the node.
pub fn maintain_transaction_pool<SP>(eth_client: Arc<EthClient<SP>>)
pub fn maintain_transaction_pool<SP>(eth_client: Arc<EthClient<SP>>, prune_duration: Duration)
where
SP: starknet::providers::Provider + Send + Sync + Clone + 'static,
{
tokio::spawn(async move {
let mut block_number = 0u64;

// Mapping to store the transactions in the mempool with a timestamp to potentially prune them
let mut mempool_transactions = HashMap::new();

loop {
// Adding the transactions to the mempool mapping with a timestamp
for tx in eth_client
.mempool()
.queued_transactions()
.into_iter()
.chain(eth_client.mempool().pending_transactions())
{
mempool_transactions.entry(*tx.hash()).or_insert_with(Instant::now);
}

// Fetch the latest block number
let Ok(current_block_number) = eth_client.eth_provider().block_number().await else {
tracing::error!(target: "maintain_transaction_pool", "failed to fetch current block number");
Expand Down Expand Up @@ -319,7 +333,7 @@ where
chain_spec.base_fee_params_at_timestamp(latest_header.timestamp + 12),
)
.unwrap_or_default(),
pending_blob_fee: latest_header.next_block_blob_fee(),
pending_blob_fee: None,
};
eth_client.mempool().set_block_info(info);

Expand All @@ -338,7 +352,31 @@ where
}

let sealed_block = latest_block.seal(hash);
let mined_transactions = sealed_block.body.transactions.iter().map(|tx| tx.hash).collect();
let mut mined_transactions: Vec<_> =
sealed_block.body.transactions.iter().map(|tx| tx.hash).collect();

// Prune mined transactions from the mempool mapping
for tx_hash in &mined_transactions {
mempool_transactions.remove(tx_hash);
}

// Prune transactions that have been in the mempool for more than 5 minutes
let now = Instant::now();

for (tx_hash, timestamp) in mempool_transactions.clone() {
// - If the transaction has been in the mempool for more than 5 minutes
// - And the transaction is in the mempool right now
if now.duration_since(timestamp) > prune_duration && eth_client.mempool().contains(&tx_hash)
{
tracing::warn!(target: "maintain_transaction_pool", ?tx_hash, "pruning");

// Add the transaction to the mined transactions so that it can be pruned
mined_transactions.push(tx_hash);

// Remove the transaction from the mempool mapping
mempool_transactions.remove(&tx_hash);
}
}

// Canonical update
let update = CanonicalStateUpdate {
Expand Down
2 changes: 1 addition & 1 deletion src/pool/mod.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,3 @@
mod constants;
pub mod constants;
pub mod mempool;
pub mod validate;
2 changes: 1 addition & 1 deletion src/providers/eth_provider/database/filter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -256,7 +256,7 @@ impl<T: Default> EthDatabaseFilterBuilder<T> {
}
}

pub(crate) fn format_hex(value: impl LowerHex, width: usize) -> String {
pub fn format_hex(value: impl LowerHex, width: usize) -> String {
// Add 2 to the width to account for the 0x prefix.
let s = format!("{:#0width$x}", value, width = width + 2);
// `s.len() < width` can happen because of the LowerHex implementation
Expand Down
128 changes: 123 additions & 5 deletions tests/tests/mempool.rs
Original file line number Diff line number Diff line change
@@ -1,19 +1,35 @@
#![allow(clippy::used_underscore_binding)]
#![cfg(feature = "testing")]
use alloy_consensus::TxEip1559;
use alloy_consensus::{TxEip1559, EMPTY_ROOT_HASH};
use alloy_eips::eip2718::Encodable2718;
use alloy_primitives::{Address, TxKind, U256};
use alloy_primitives::{Address, TxKind, B64, U256};
use alloy_rpc_types::Header;
use kakarot_rpc::{
providers::eth_provider::{error::SignatureError, ChainProvider},
pool::mempool::maintain_transaction_pool,
providers::eth_provider::{
constant::U64_HEX_STRING_LEN,
database::{
filter::{self, format_hex, EthDatabaseFilterBuilder},
types::header::StoredHeader,
},
error::SignatureError,
ChainProvider,
},
test_utils::{
eoa::Eoa,
fixtures::{katana_empty, setup},
fixtures::{katana, katana_empty, setup},
katana::Katana,
},
};
use mongodb::{
bson::doc,
options::{UpdateModifications, UpdateOptions},
};
use reth_primitives::{sign_message, Transaction, TransactionSigned, TransactionSignedEcRecovered};
use reth_transaction_pool::{EthPooledTransaction, TransactionOrigin, TransactionPool};
use reth_transaction_pool::{EthPooledTransaction, PoolTransaction, TransactionOrigin, TransactionPool};
use revm_primitives::B256;
use rstest::*;
use std::{sync::Arc, time::Duration};

#[rstest]
#[awt]
Expand Down Expand Up @@ -351,3 +367,105 @@ pub async fn create_sample_transactions(
}
Ok(transactions)
}

#[rstest]
#[awt]
#[tokio::test(flavor = "multi_thread")]
async fn test_maintain_mempool(#[future] katana: Katana, _setup: ()) {
let eth_client = Arc::new(katana.eth_client());

// Create two sample transactions at once
let transactions = create_sample_transactions(&katana, 2).await.expect("Failed to create sample transactions");

// Extract and ensure we have two valid transactions from the transaction list.
let ((transaction1, _), (transaction2, _)) = (
transactions.first().expect("Expected at least one transaction").clone(),
transactions.get(1).expect("Expected at least two transactions").clone(),
);

// Add transactions to the mempool
eth_client.mempool().add_transaction(TransactionOrigin::Private, transaction1.clone()).await.unwrap();
eth_client.mempool().add_transaction(TransactionOrigin::Private, transaction2.clone()).await.unwrap();

// Start maintaining the transaction pool
//
// This task will periodically prune the mempool based on the given prune_duration.
// For testing purposes, we set the prune_duration to 100 milliseconds.
let prune_duration = Duration::from_millis(100);
let eth_client_clone = Arc::clone(&eth_client);
let maintain_task = tokio::spawn(async move {
maintain_transaction_pool(eth_client_clone, prune_duration);
});

// Initialize the block number based on the current blockchain state from katana.
let mut last_block_number = katana.block_number();

// Loop to simulate new blocks being added to the blockchain every 100 milliseconds.
for _ in 0..9 {
// Sleep for 10 milliseconds to simulate the passage of time between blocks.
tokio::time::sleep(Duration::from_millis(10)).await;

// Increment the block number to simulate the blockchain progressing.
last_block_number += 1;

// Format the block number in both padded and unpadded hexadecimal formats.
let unpadded_block_number = format_hex(last_block_number, 0);
let padded_block_number = format_hex(last_block_number, U64_HEX_STRING_LEN);

// Get the block header collection from the database.
let header_collection = eth_client.eth_provider().database().collection::<StoredHeader>();

// Build a filter for updating the header based on the new block number.
let filter = EthDatabaseFilterBuilder::<filter::Header>::default().with_block_number(last_block_number).build();

// Insert a new header for the new block number in the database.
eth_client
.eth_provider()
.database()
.update_one(
StoredHeader {
header: Header {
hash: B256::random(),
total_difficulty: Some(U256::default()),
mix_hash: Some(B256::default()),
nonce: Some(B64::default()),
withdrawals_root: Some(EMPTY_ROOT_HASH),
base_fee_per_gas: Some(0),
blob_gas_used: Some(0),
excess_blob_gas: Some(0),
number: last_block_number,
..Default::default()
},
},
filter,
true,
)
.await
.expect("Failed to update header in database");

// Update the header collection with the padded block number in the database.
header_collection
.update_one(
doc! {"header.number": unpadded_block_number},
UpdateModifications::Document(doc! {"$set": {"header.number": padded_block_number}}),
)
.with_options(UpdateOptions::builder().upsert(true).build())
.await
.expect("Failed to update block number");

// Check if both transactions are still in the mempool.
// We expect them to still be in the mempool until 1 second has elapsed.
assert!(eth_client.mempool().contains(transaction1.hash()), "Transaction 1 should still be in the mempool");
assert!(eth_client.mempool().contains(transaction2.hash()), "Transaction 2 should still be in the mempool");
}

// Sleep for some additional time to allow the pruning to occur.
tokio::time::sleep(Duration::from_millis(500)).await;

// Verify that both transactions have been pruned from the mempool after the pruning duration.
assert!(!eth_client.mempool().contains(transaction1.hash()), "Transaction 1 should be pruned after 1 second");
assert!(!eth_client.mempool().contains(transaction2.hash()), "Transaction 2 should be pruned after 1 second");

// Ensure the background task is stopped gracefully.
maintain_task.abort();
}

0 comments on commit 129b6f8

Please sign in to comment.