Skip to content

Commit

Permalink
rm useless StoredPendingTransaction struct (#1385)
Browse files Browse the repository at this point in the history
* rm useless StoredPendingTransaction struct

* fix tests
  • Loading branch information
tcoratger authored Sep 19, 2024
1 parent 67fb6cd commit 9622c0a
Show file tree
Hide file tree
Showing 10 changed files with 162 additions and 320 deletions.
81 changes: 1 addition & 80 deletions src/providers/eth_provider/database/ethereum.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,7 @@
use super::{
filter,
filter::EthDatabaseFilterBuilder,
types::{
header::StoredHeader,
transaction::{StoredPendingTransaction, StoredTransaction},
},
types::{header::StoredHeader, transaction::StoredTransaction},
Database,
};
use crate::providers::eth_provider::error::{EthApiError, EthereumDataFormatError};
Expand All @@ -25,16 +22,8 @@ pub trait EthereumTransactionStore {
async fn transaction(&self, hash: &B256) -> Result<Option<Transaction>, EthApiError>;
/// Returns all transactions for the given block hash or number.
async fn transactions(&self, block_hash_or_number: BlockHashOrNumber) -> Result<Vec<Transaction>, EthApiError>;
/// Returns the pending transaction with the given hash. Returns None if the
/// transaction is not found.
async fn pending_transaction(&self, hash: &B256) -> Result<Option<Transaction>, EthApiError>;
/// Returns the pending transaction's retries with the given hash.
/// Returns 0 if the transaction is not found.
async fn pending_transaction_retries(&self, hash: &B256) -> Result<u8, EthApiError>;
/// Upserts the given transaction.
async fn upsert_transaction(&self, transaction: Transaction) -> Result<(), EthApiError>;
/// Upserts the given transaction as a pending transaction with the given number of retries.
async fn upsert_pending_transaction(&self, transaction: Transaction, retries: u8) -> Result<(), EthApiError>;
}

#[async_trait]
Expand All @@ -54,29 +43,11 @@ impl EthereumTransactionStore for Database {
Ok(self.get::<StoredTransaction>(filter, None).await?.into_iter().map(Into::into).collect())
}

#[instrument(skip_all, name = "db::pending_transaction", err)]
async fn pending_transaction(&self, hash: &B256) -> Result<Option<Transaction>, EthApiError> {
let filter = EthDatabaseFilterBuilder::<filter::Transaction>::default().with_tx_hash(hash).build();
Ok(self.get_one::<StoredPendingTransaction>(filter, None).await?.map(Into::into))
}

#[instrument(skip_all, name = "db::pending_transaction_retries", err)]
async fn pending_transaction_retries(&self, hash: &B256) -> Result<u8, EthApiError> {
let filter = EthDatabaseFilterBuilder::<filter::Transaction>::default().with_tx_hash(hash).build();
Ok(self.get_one::<StoredPendingTransaction>(filter, None).await?.map(|tx| tx.retries + 1).unwrap_or_default())
}

#[instrument(skip_all, name = "db::upsert_transaction", err)]
async fn upsert_transaction(&self, transaction: Transaction) -> Result<(), EthApiError> {
let filter = EthDatabaseFilterBuilder::<filter::Transaction>::default().with_tx_hash(&transaction.hash).build();
Ok(self.update_one(StoredTransaction::from(transaction), filter, true).await?)
}

#[instrument(skip_all, name = "db::upsert_pending_transaction", err)]
async fn upsert_pending_transaction(&self, transaction: Transaction, retries: u8) -> Result<(), EthApiError> {
let filter = EthDatabaseFilterBuilder::<filter::Transaction>::default().with_tx_hash(&transaction.hash).build();
Ok(self.update_one(StoredPendingTransaction::new(transaction, retries), filter, true).await?)
}
}

/// Trait for interacting with a database that stores Ethereum typed
Expand Down Expand Up @@ -225,9 +196,6 @@ mod tests {
// Test fetching transactions by their block number
test_get_transactions_by_block_number(&database, &mongo_fuzzer).await;

// Test upserting pending transactions into the database
test_upsert_pending_transactions(&mut unstructured, &database).await;

// Test upserting transactions into the database
test_upsert_transactions(&mut unstructured, &database).await;
}
Expand Down Expand Up @@ -282,60 +250,13 @@ mod tests {
assert_eq!(database.transactions(first_block_number.into()).await.unwrap(), transactions_first_block_number);
}

async fn test_upsert_pending_transactions(unstructured: &mut arbitrary::Unstructured<'_>, database: &Database) {
// Generate 10 pending transactions and add them to the database
let pending_transactions: Vec<StoredPendingTransaction> =
(0..10).map(|_| StoredPendingTransaction::arbitrary(unstructured).unwrap()).collect();

// Add pending transactions to the database
for tx in &pending_transactions {
database
.upsert_pending_transaction(tx.into(), tx.retries)
.await
.expect("Failed to update pending transaction in database");
}

// Test retrieving a pending transaction by its hash
let first_pending_transaction = pending_transactions.first().unwrap();
assert_eq!(
database.pending_transaction(&first_pending_transaction.hash).await.unwrap(),
Some(first_pending_transaction.into())
);

// Test retrieving a non-existent pending transaction by its hash
let unstored_transaction = StoredTransaction::arbitrary(unstructured).unwrap();
assert_eq!(database.pending_transaction(&unstored_transaction.hash).await.unwrap(), None);

// Test retrieving the number of retries for a pending transaction
assert_eq!(
database.pending_transaction_retries(&first_pending_transaction.hash).await.unwrap(),
first_pending_transaction.clone().retries.saturating_add(1)
);

// Test retrieving the number of retries for a non-existent pending transaction
assert_eq!(database.pending_transaction_retries(&unstored_transaction.hash).await.unwrap(), 0);
}

async fn test_upsert_transactions(unstructured: &mut arbitrary::Unstructured<'_>, database: &Database) {
// Generate and upsert a mock transaction into the database
let mock_transaction = StoredTransaction::arbitrary(unstructured).unwrap();
database.upsert_transaction(mock_transaction.clone().tx).await.unwrap();

// Test retrieving an upserted transaction by its hash
assert_eq!(database.transaction(&mock_transaction.hash).await.unwrap(), Some(mock_transaction.into()));

// Generate and upsert a mock pending transaction into the database
let mock_pending_transaction = StoredPendingTransaction::arbitrary(unstructured).unwrap();
database
.upsert_pending_transaction(mock_pending_transaction.clone().tx, mock_pending_transaction.clone().retries)
.await
.unwrap();

// Test retrieving an upserted pending transaction by its hash
assert_eq!(
database.pending_transaction(&mock_pending_transaction.hash).await.unwrap(),
Some(mock_pending_transaction.tx)
);
}

#[tokio::test(flavor = "multi_thread")]
Expand Down
12 changes: 1 addition & 11 deletions src/providers/eth_provider/database/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,10 +5,7 @@ pub mod types;

use super::error::KakarotError;
use crate::providers::eth_provider::database::types::{
header::StoredHeader,
log::StoredLog,
receipt::StoredTransactionReceipt,
transaction::{StoredPendingTransaction, StoredTransaction},
header::StoredHeader, log::StoredLog, receipt::StoredTransactionReceipt, transaction::StoredTransaction,
};
use futures::TryStreamExt;
use itertools::Itertools;
Expand Down Expand Up @@ -225,13 +222,6 @@ impl CollectionName for StoredTransaction {
}
}

/// Implement [`CollectionName`] for [`StoredPendingTransaction`]
impl CollectionName for StoredPendingTransaction {
fn collection_name() -> &'static str {
"transactions_pending"
}
}

/// Implement [`CollectionName`] for [`StoredTransactionReceipt`]
impl CollectionName for StoredTransactionReceipt {
fn collection_name() -> &'static str {
Expand Down
42 changes: 0 additions & 42 deletions src/providers/eth_provider/database/types/transaction.rs
Original file line number Diff line number Diff line change
Expand Up @@ -113,48 +113,6 @@ impl Arbitrary<'_> for StoredTransaction {
}
}

#[derive(Debug, Serialize, Deserialize, Eq, PartialEq, Clone)]
pub struct StoredPendingTransaction {
/// Transaction object
#[serde(deserialize_with = "crate::providers::eth_provider::database::types::serde::deserialize_intermediate")]
pub tx: Transaction,
/// Number of retries
pub retries: u8,
}

impl StoredPendingTransaction {
pub const fn new(tx: Transaction, retries: u8) -> Self {
Self { tx, retries }
}
}

#[cfg(any(test, feature = "arbitrary", feature = "testing"))]
impl Arbitrary<'_> for StoredPendingTransaction {
fn arbitrary(u: &mut arbitrary::Unstructured<'_>) -> arbitrary::Result<Self> {
Ok(Self { tx: StoredTransaction::arbitrary(u)?.into(), retries: u8::arbitrary(u)? })
}
}

impl From<StoredPendingTransaction> for Transaction {
fn from(tx: StoredPendingTransaction) -> Self {
tx.tx
}
}

impl From<&StoredPendingTransaction> for Transaction {
fn from(tx: &StoredPendingTransaction) -> Self {
tx.tx.clone()
}
}

impl Deref for StoredPendingTransaction {
type Target = Transaction;

fn deref(&self) -> &Self::Target {
&self.tx
}
}

#[derive(Debug, Deserialize)]
#[serde(rename_all = "camelCase")]
pub struct Hash {
Expand Down
2 changes: 2 additions & 0 deletions src/providers/eth_provider/starknet/kakarot_core.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
#![allow(clippy::too_many_arguments)]

use crate::into_via_wrapper;
use cainome::rs::abigen_legacy;
use dotenvy::dotenv;
Expand Down
10 changes: 4 additions & 6 deletions src/providers/eth_provider/transactions.rs
Original file line number Diff line number Diff line change
@@ -1,10 +1,6 @@
use super::{
constant::HASH_HEX_STRING_LEN,
database::{
filter::EthDatabaseFilterBuilder,
types::transaction::{StoredPendingTransaction, StoredTransaction},
CollectionName,
},
database::{filter::EthDatabaseFilterBuilder, types::transaction::StoredTransaction, CollectionName},
error::ExecutionError,
starknet::kakarot_core::{account_contract::AccountContractReader, starknet_address},
utils::{contract_not_found, entrypoint_not_found},
Expand Down Expand Up @@ -54,11 +50,13 @@ where
SP: starknet::providers::Provider + Send + Sync,
{
async fn transaction_by_hash(&self, hash: B256) -> EthApiResult<Option<reth_rpc_types::Transaction>> {
// TODO: modify this for the tests to pass because now we don't have a pending transactions collection anymore.
// TODO: So we need to remove the unionWith part and we need to search inside the final transactions collection + inside the mempool.
let pipeline = vec![
doc! {
// Union with pending transactions with only specified hash
"$unionWith": {
"coll": StoredPendingTransaction::collection_name(),
"coll": StoredTransaction::collection_name(),
"pipeline": [
{
"$match": {
Expand Down
7 changes: 3 additions & 4 deletions src/providers/eth_provider/tx_pool.rs
Original file line number Diff line number Diff line change
@@ -1,10 +1,8 @@
use super::database::types::transaction::StoredPendingTransaction;
use crate::providers::eth_provider::provider::{EthApiResult, EthDataProvider};
use async_trait::async_trait;
use auto_impl::auto_impl;
use mongodb::bson::doc;
use reth_rpc_types::{txpool::TxpoolContent, Transaction};
use tracing::Instrument;

/// Ethereum provider trait. Used to abstract away the database and the network.
#[async_trait]
Expand All @@ -23,8 +21,9 @@ where
SP: starknet::providers::Provider + Send + Sync,
{
async fn txpool_transactions(&self) -> EthApiResult<Vec<Transaction>> {
let span = tracing::span!(tracing::Level::INFO, "sn::txpool");
Ok(self.database().get_all_and_map_to::<Transaction, StoredPendingTransaction>().instrument(span).await?)
// let span = tracing::span!(tracing::Level::INFO, "sn::txpool");
// TODO: we need certainly to move this implementation and rely on the mempool to check this
Ok(vec![])
}

async fn txpool_content(&self) -> EthApiResult<TxpoolContent> {
Expand Down
11 changes: 0 additions & 11 deletions src/test_utils/katana/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -211,17 +211,6 @@ impl<'a> Katana {
.expect("Failed to insert logs into the database");
}

/// Adds pending transactions to the database.
pub async fn add_pending_transactions_to_database(&self, txs: Vec<Transaction>) {
let provider = self.eth_provider();
let database = provider.database();

// Add the transactions to the database.
for tx in txs {
database.upsert_pending_transaction(tx, 0).await.expect("Failed to update pending transaction in database");
}
}

/// Adds transactions to the database along with a corresponding header.
pub async fn add_transactions_with_header_to_database(&self, txs: Vec<Transaction>, header: Header) {
let provider = self.eth_provider();
Expand Down
51 changes: 15 additions & 36 deletions tests/tests/eth_provider.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@ use kakarot_rpc::{
models::felt::Felt252Wrapper,
providers::eth_provider::{
constant::{MAX_LOGS, STARKNET_MODULUS},
database::{ethereum::EthereumTransactionStore, types::transaction::StoredPendingTransaction},
provider::EthereumProvider,
BlockProvider, ChainProvider, GasProvider, LogProvider, ReceiptProvider, StateProvider, TransactionProvider,
},
Expand Down Expand Up @@ -748,29 +747,16 @@ async fn test_send_raw_transaction(#[future] katana: Katana, _setup: ()) {
.await
.expect("failed to send transaction");

// Retrieve the transaction from the database
let tx: Option<StoredPendingTransaction> =
eth_provider.database().get_first().await.expect("Failed to get transaction");

// Assert that the number of retries is 0
assert_eq!(0, tx.clone().unwrap().retries);

let tx = tx.unwrap().tx;

// Assert the transaction hash and block number
assert_eq!(tx.hash, transaction_signed.hash());
assert!(tx.block_number.is_none());

// Retrieve the current size of the mempool
let mempool_size_after_send = eth_client.mempool().pool_size();
// Assert that the number of pending transactions in the mempool is 1
assert_eq!(mempool_size_after_send.pending, 1);
assert_eq!(mempool_size_after_send.total, 1);
let tx_in_mempool = eth_client.mempool().get(&tx.hash);
let tx_in_mempool = eth_client.mempool().get(&transaction_signed.hash());
// Assert that the transaction in the mempool exists
assert!(tx_in_mempool.is_some());
// Verify that the hash of the transaction in the mempool matches the expected hash
assert_eq!(tx_in_mempool.unwrap().hash(), *tx.hash);
assert_eq!(tx_in_mempool.unwrap().hash(), *transaction_signed.hash());
}

#[rstest]
Expand Down Expand Up @@ -1033,7 +1019,6 @@ async fn test_send_raw_transaction_pre_eip_155(#[future] katana: Katana, _setup:
#[tokio::test(flavor = "multi_thread")]
async fn test_send_raw_transaction_wrong_signature(#[future] katana: Katana, _setup: ()) {
// Given
let eth_provider = katana.eth_provider();
let eth_client = katana.eth_client();

// Create a sample transaction
Expand Down Expand Up @@ -1061,13 +1046,6 @@ async fn test_send_raw_transaction_wrong_signature(#[future] katana: Katana, _se
// Send the transaction
let _ = eth_client.send_raw_transaction(transaction_signed.envelope_encoded()).await;

// Retrieve the transaction from the database
let tx: Option<StoredPendingTransaction> =
eth_provider.database().get_first().await.expect("Failed to get transaction");

// Assert that no transaction is found
assert!(tx.is_none());

let mempool_size_after_send = eth_client.mempool().pool_size();
// Verify that the number of pending transactions in the mempool remains unchanged (0 tx)
assert_eq!(mempool_size_after_send.pending, 0);
Expand Down Expand Up @@ -1355,21 +1333,22 @@ async fn test_transaction_by_hash(#[future] katana: Katana, _setup: ()) {
.await
.expect("failed to send transaction");

// Retrieve the pending transaction from the database
let mut stored_transaction: StoredPendingTransaction =
eth_provider.database().get_first().await.expect("Failed to get transaction").unwrap();
// TODO: need to write this with the mempool
// // Retrieve the pending transaction from the database
// let mut stored_transaction: StoredPendingTransaction =
// eth_provider.database().get_first().await.expect("Failed to get transaction").unwrap();

let tx = stored_transaction.clone().tx;
// let tx = stored_transaction.clone().tx;

// Check if the pending transaction is returned correctly by the `transaction_by_hash` method
assert_eq!(eth_provider.transaction_by_hash(tx.hash).await.unwrap().unwrap(), tx);
// // Check if the pending transaction is returned correctly by the `transaction_by_hash` method
// assert_eq!(eth_provider.transaction_by_hash(tx.hash).await.unwrap().unwrap(), tx);

// Modify the block number of the pending transaction
stored_transaction.tx.block_number = Some(1111);
// // Modify the block number of the pending transaction
// stored_transaction.tx.block_number = Some(1111);

// Insert the transaction into the final transaction collection
eth_provider.database().upsert_transaction(stored_transaction.into()).await.expect("Failed to insert documents");
// // Insert the transaction into the final transaction collection
// eth_provider.database().upsert_transaction(stored_transaction.into()).await.expect("Failed to insert documents");

// Check if the final transaction is returned correctly by the `transaction_by_hash` method
assert_eq!(eth_provider.transaction_by_hash(tx.hash).await.unwrap().unwrap().block_number, Some(1111));
// // Check if the final transaction is returned correctly by the `transaction_by_hash` method
// assert_eq!(eth_provider.transaction_by_hash(tx.hash).await.unwrap().unwrap().block_number, Some(1111));
}
Loading

0 comments on commit 9622c0a

Please sign in to comment.