Skip to content

Commit

Permalink
re-activate txpool API (#1390)
Browse files Browse the repository at this point in the history
* re-activate txpool API

* clean up

* clean up

* clean up

* clean up

* clean up
  • Loading branch information
tcoratger authored Sep 20, 2024
1 parent 4f67743 commit 18e1c65
Show file tree
Hide file tree
Showing 10 changed files with 296 additions and 176 deletions.
41 changes: 38 additions & 3 deletions src/client/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ use crate::{
database::Database,
error::{EthApiError, EthereumDataFormatError, KakarotError, SignatureError},
provider::{EthApiResult, EthDataProvider},
TxPoolProvider,
},
sn_provider::StarknetProvider,
},
Expand All @@ -16,12 +17,14 @@ use alloy_rlp::Decodable;
use async_trait::async_trait;
use num_traits::ToPrimitive;
use reth_chainspec::ChainSpec;
use reth_primitives::{Bytes, TransactionSigned, TransactionSignedEcRecovered, B256};
use reth_primitives::{Address, Bytes, TransactionSigned, TransactionSignedEcRecovered, B256};
use reth_rpc_types::{txpool::TxpoolContent, Transaction};
use reth_transaction_pool::{
blobstore::NoopBlobStore, EthPooledTransaction, PoolConfig, TransactionOrigin, TransactionPool,
blobstore::NoopBlobStore, AllPoolTransactions, EthPooledTransaction, PoolConfig, PoolTransaction,
TransactionOrigin, TransactionPool,
};
use starknet::{core::types::Felt, providers::Provider};
use std::sync::Arc;
use std::{collections::BTreeMap, sync::Arc};

#[async_trait]
pub trait KakarotTransactions {
Expand Down Expand Up @@ -109,3 +112,35 @@ where
Ok(hash)
}
}

#[async_trait]
impl<SP> TxPoolProvider for EthClient<SP>
where
SP: starknet::providers::Provider + Send + Sync,
{
fn content(&self) -> TxpoolContent {
#[inline]
fn insert<T: PoolTransaction>(tx: &T, content: &mut BTreeMap<Address, BTreeMap<String, Transaction>>) {
content.entry(tx.sender()).or_default().insert(
tx.nonce().to_string(),
reth_rpc_types_compat::transaction::from_recovered(tx.to_recovered_transaction()),
);
}

let AllPoolTransactions { pending, queued } = self.pool.all_transactions();

let mut content = TxpoolContent::default();
for pending in pending {
insert(&pending.transaction, &mut content.pending);
}
for queued in queued {
insert(&queued.transaction, &mut content.queued);
}

content
}

async fn txpool_content(&self) -> EthApiResult<TxpoolContent> {
Ok(self.content())
}
}
2 changes: 1 addition & 1 deletion src/eth_rpc/rpc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ where
let eth_provider = eth_client.eth_provider().clone();

let alchemy_provider = Arc::new(AlchemyDataProvider::new(eth_provider.clone()));
let pool_provider = Arc::new(PoolDataProvider::new(eth_provider.clone()));
let pool_provider = Arc::new(PoolDataProvider::new(eth_client.clone()));
let debug_provider = Arc::new(DebugDataProvider::new(eth_provider.clone()));

let eth_rpc_module = EthRpc::new(eth_client).into_rpc();
Expand Down
17 changes: 4 additions & 13 deletions src/providers/eth_provider/provider.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,10 +12,7 @@ use crate::{
into_via_try_wrapper, into_via_wrapper,
models::block::{EthBlockId, EthBlockNumberOrTag},
providers::{
eth_provider::{
BlockProvider, GasProvider, LogProvider, ReceiptProvider, StateProvider, TransactionProvider,
TxPoolProvider,
},
eth_provider::{BlockProvider, GasProvider, LogProvider, ReceiptProvider, StateProvider, TransactionProvider},
sn_provider::StarknetProvider,
},
};
Expand Down Expand Up @@ -47,18 +44,12 @@ pub type EthApiResult<T> = Result<T, EthApiError>;

/// A trait that defines the interface for an Ethereum Provider.
pub trait EthereumProvider:
GasProvider + StateProvider + TransactionProvider + ReceiptProvider + LogProvider + TxPoolProvider + BlockProvider
GasProvider + StateProvider + TransactionProvider + ReceiptProvider + LogProvider + BlockProvider
{
}

impl<T> EthereumProvider for T where
T: GasProvider
+ StateProvider
+ TransactionProvider
+ ReceiptProvider
+ LogProvider
+ TxPoolProvider
+ BlockProvider
T: GasProvider + StateProvider + TransactionProvider + ReceiptProvider + LogProvider + BlockProvider
{
}

Expand All @@ -69,7 +60,7 @@ impl<T> EthereumProvider for T where
pub struct EthDataProvider<SP: starknet::providers::Provider + Send + Sync> {
database: Database,
starknet_provider: StarknetProvider<SP>,
pub(crate) chain_id: u64,
pub chain_id: u64,
}

impl<SP> EthDataProvider<SP>
Expand Down
25 changes: 3 additions & 22 deletions src/providers/eth_provider/tx_pool.rs
Original file line number Diff line number Diff line change
@@ -1,35 +1,16 @@
use crate::providers::eth_provider::provider::{EthApiResult, EthDataProvider};
use crate::providers::eth_provider::provider::EthApiResult;
use async_trait::async_trait;
use auto_impl::auto_impl;
use mongodb::bson::doc;
use reth_rpc_types::{txpool::TxpoolContent, Transaction};
use reth_rpc_types::txpool::TxpoolContent;

/// Ethereum provider trait. Used to abstract away the database and the network.
#[async_trait]
#[auto_impl(Arc, &)]
pub trait TxPoolProvider {
/// Returns a vec of pending pool transactions.
async fn txpool_transactions(&self) -> EthApiResult<Vec<Transaction>>;
fn content(&self) -> TxpoolContent;

/// Returns the content of the pending pool.
async fn txpool_content(&self) -> EthApiResult<TxpoolContent>;
}

#[async_trait]
impl<SP> TxPoolProvider for EthDataProvider<SP>
where
SP: starknet::providers::Provider + Send + Sync,
{
async fn txpool_transactions(&self) -> EthApiResult<Vec<Transaction>> {
// let span = tracing::span!(tracing::Level::INFO, "sn::txpool");
// TODO: we need certainly to move this implementation and rely on the mempool to check this
Ok(vec![])
}

async fn txpool_content(&self) -> EthApiResult<TxpoolContent> {
Ok(self.txpool_transactions().await?.into_iter().fold(TxpoolContent::default(), |mut content, pending| {
content.pending.entry(pending.from).or_default().insert(pending.nonce.to_string(), pending);
content
}))
}
}
51 changes: 35 additions & 16 deletions src/providers/pool_provider.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
use crate::providers::eth_provider::provider::{EthApiResult, EthereumProvider};
use super::eth_provider::TxPoolProvider;
use crate::providers::eth_provider::provider::EthApiResult;
use async_trait::async_trait;
use auto_impl::auto_impl;
use reth_primitives::Address;
Expand All @@ -14,18 +15,18 @@ pub trait PoolProvider {
}

#[derive(Debug, Clone)]
pub struct PoolDataProvider<P: EthereumProvider> {
pub struct PoolDataProvider<P: TxPoolProvider> {
eth_provider: P,
}

impl<P: EthereumProvider> PoolDataProvider<P> {
impl<P: TxPoolProvider> PoolDataProvider<P> {
pub const fn new(eth_provider: P) -> Self {
Self { eth_provider }
}
}

#[async_trait]
impl<P: EthereumProvider + Send + Sync + 'static> PoolProvider for PoolDataProvider<P> {
impl<P: TxPoolProvider + Send + Sync + 'static> PoolProvider for PoolDataProvider<P> {
async fn txpool_status(&self) -> EthApiResult<TxpoolStatus> {
let all = self.eth_provider.txpool_content().await?;
Ok(TxpoolStatus { pending: all.pending.len() as u64, queued: all.queued.len() as u64 })
Expand All @@ -34,18 +35,36 @@ impl<P: EthereumProvider + Send + Sync + 'static> PoolProvider for PoolDataProvi
async fn txpool_inspect(&self) -> EthApiResult<TxpoolInspect> {
let mut inspect = TxpoolInspect::default();

let transactions = self.eth_provider.txpool_transactions().await?;

for transaction in transactions {
inspect.pending.entry(transaction.from).or_default().insert(
transaction.nonce.to_string(),
TxpoolInspectSummary {
to: transaction.to,
value: transaction.value,
gas: transaction.gas,
gas_price: transaction.gas_price.unwrap_or_default(),
},
);
let transactions = self.eth_provider.content();

// Organize the pending transactions in the inspect summary struct.
for (sender, nonce_transaction) in transactions.pending {
for (nonce, transaction) in nonce_transaction {
inspect.pending.entry((*sender).into()).or_default().insert(
nonce.clone(),
TxpoolInspectSummary {
to: transaction.to,
value: transaction.value,
gas: transaction.gas,
gas_price: transaction.gas_price.unwrap_or_default(),
},
);
}
}

// Organize the queued transactions in the inspect summary struct.
for (sender, nonce_transaction) in transactions.queued {
for (nonce, transaction) in nonce_transaction {
inspect.queued.entry((*sender).into()).or_default().insert(
nonce.clone(),
TxpoolInspectSummary {
to: transaction.to,
value: transaction.value,
gas: transaction.gas,
gas_price: transaction.gas_price.unwrap_or_default(),
},
);
}
}

Ok(inspect)
Expand Down
8 changes: 8 additions & 0 deletions src/test_utils/fixtures.rs
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,14 @@ pub async fn katana() -> Katana {
Katana::new(RANDOM_BYTES_SIZE).await
}

/// This fixture creates a new test environment on Katana.
#[cfg(any(test, feature = "arbitrary", feature = "testing"))]
#[fixture]
pub async fn katana_empty() -> Katana {
// Create a new test environment on Katana
Katana::new_empty().await
}

/// This fixture configures the tests. The following setup
/// is used:
/// - The log level is set to `info`
Expand Down
54 changes: 54 additions & 0 deletions src/test_utils/katana/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -105,6 +105,60 @@ impl<'a> Katana {
Self::initialize(sequencer, starknet_provider, rnd_bytes_size).await
}

#[cfg(any(test, feature = "arbitrary", feature = "testing"))]
pub async fn new_empty() -> Self {
use reth_primitives::{constants::EMPTY_ROOT_HASH, B64, U256};

let sequencer = katana_sequencer().await;
let starknet_provider = Arc::new(JsonRpcClient::new(HttpTransport::new(sequencer.url())));

// Load the private key from the environment variables.
dotenvy::dotenv().expect("Failed to load .env file");
let pk = std::env::var("EVM_PRIVATE_KEY").expect("Failed to get EVM private key");
let pk = B256::from_str(&pk).expect("Failed to parse EVM private key");

// Set the relayer private key in the environment variables.
std::env::set_var("RELAYER_PRIVATE_KEY", format!("0x{:x}", sequencer.raw_account().private_key));

// Initialize a MongoFuzzer instance with the specified random bytes size.
let mut mongo_fuzzer = MongoFuzzer::new(0).await;
mongo_fuzzer.headers.push(StoredHeader {
header: Header {
hash: Some(B256::random()),
total_difficulty: Some(U256::default()),
mix_hash: Some(B256::default()),
nonce: Some(B64::default()),
withdrawals_root: Some(EMPTY_ROOT_HASH),
base_fee_per_gas: Some(0),
blob_gas_used: Some(0),
excess_blob_gas: Some(0),
number: Some(0),
..Default::default()
},
});

// Finalize the empty MongoDB database initialization and get the database instance.
let database = mongo_fuzzer.finalize().await;

// Initialize the EthClient
let eth_client = EthClient::try_new(starknet_provider, database).await.expect("failed to start eth client");

// Create a new Kakarot EOA instance with the private key and EthDataProvider instance.
let eoa = KakarotEOA::new(pk, Arc::new(eth_client.clone()));

// Return a new instance of Katana with initialized fields.
Self {
sequencer,
eoa,
eth_client,
container: Some(mongo_fuzzer.container),
transactions: mongo_fuzzer.transactions,
receipts: mongo_fuzzer.receipts,
logs: mongo_fuzzer.logs,
headers: mongo_fuzzer.headers,
}
}

/// Initializes the Katana test environment.
#[cfg(any(test, feature = "arbitrary", feature = "testing"))]
async fn initialize(
Expand Down
13 changes: 2 additions & 11 deletions src/test_utils/mock_provider.rs
Original file line number Diff line number Diff line change
@@ -1,13 +1,11 @@
use crate::providers::eth_provider::{
provider::EthApiResult, BlockProvider, ChainProvider, GasProvider, LogProvider, ReceiptProvider, StateProvider,
TransactionProvider, TxPoolProvider,
TransactionProvider,
};
use async_trait::async_trait;
use mockall::mock;
use reth_primitives::{Address, BlockId, BlockNumberOrTag, Bytes, B256, U256, U64};
use reth_rpc_types::{
txpool::TxpoolContent, Filter, FilterChanges, Header, SyncStatus, TransactionReceipt, TransactionRequest,
};
use reth_rpc_types::{Filter, FilterChanges, Header, SyncStatus, TransactionReceipt, TransactionRequest};

mock! {
#[derive(Clone, Debug)]
Expand Down Expand Up @@ -80,11 +78,4 @@ mock! {

async fn transaction_count(&self, address: Address, block_id: Option<BlockId>) -> EthApiResult<U256>;
}

#[async_trait]
impl TxPoolProvider for EthereumProviderStruct {
async fn txpool_transactions(&self) -> EthApiResult<Vec<reth_rpc_types::Transaction>>;

async fn txpool_content(&self) -> EthApiResult<TxpoolContent>;
}
}
2 changes: 1 addition & 1 deletion tests/tests/mempool.rs
Original file line number Diff line number Diff line change
Expand Up @@ -296,7 +296,7 @@ async fn test_mempool_get_private_transactions(#[future] katana: Katana, _setup:
}

// Helper function to create a sample transaction
async fn create_sample_transactions(
pub async fn create_sample_transactions(
katana: &Katana,
num_transactions: usize,
) -> Result<Vec<(EthPooledTransaction, TransactionSigned)>, SignatureError> {
Expand Down
Loading

0 comments on commit 18e1c65

Please sign in to comment.