From 4f32004eb4e49dede8c480dc054f8bed1fc4e462 Mon Sep 17 00:00:00 2001 From: greged93 <82421016+greged93@users.noreply.github.com> Date: Wed, 11 Sep 2024 12:08:43 +0200 Subject: [PATCH] dev: account manager (#1373) * update the account manager to lower the constraints on the locks * lint * update naming --- src/pool/mempool.rs | 118 +++++++++++++++++++++++--------------------- 1 file changed, 61 insertions(+), 57 deletions(-) diff --git a/src/pool/mempool.rs b/src/pool/mempool.rs index 0ec82cc45..184983a6b 100644 --- a/src/pool/mempool.rs +++ b/src/pool/mempool.rs @@ -1,5 +1,6 @@ use super::validate::KakarotTransactionValidator; use crate::client::EthClient; +use futures::future::select_all; use reth_primitives::{BlockId, U256}; use reth_transaction_pool::{ blobstore::NoopBlobStore, CoinbaseTipOrdering, EthPooledTransaction, Pool, TransactionPool, @@ -7,7 +8,10 @@ use reth_transaction_pool::{ use serde_json::Value; use starknet::core::types::Felt; use std::{collections::HashMap, fs::File, io::Read, sync::Arc, time::Duration}; -use tokio::{runtime::Handle, sync::Mutex}; +use tokio::{ + runtime::Handle, + sync::{Mutex, MutexGuard}, +}; /// A type alias for the Kakarot Transaction Validator. /// Uses the Reth implementation [`TransactionValidationTaskExecutor`]. @@ -27,7 +31,7 @@ pub type KakarotPool = Pool, TransactionOrdering, Noop #[derive(Debug)] pub struct AccountManager { /// A shared, mutable collection of accounts and their nonces. - accounts: Arc>>, + accounts: HashMap>>, /// The Ethereum client used to interact with the blockchain. eth_client: Arc>, } @@ -59,14 +63,12 @@ impl AccountM .map_err(|e| eyre::eyre!("Error converting block ID: {:?}", e))?; // Query the initial account_nonce for the account from the provider - accounts.insert( - felt_address, - eth_client - .starknet_provider() - .get_nonce(starknet_block_id, felt_address) - .await - .unwrap_or_default(), - ); + let nonce = eth_client + .starknet_provider() + .get_nonce(starknet_block_id, felt_address) + .await + .unwrap_or_default(); + accounts.insert(felt_address, Arc::new(Mutex::new(nonce))); } } } @@ -75,44 +77,30 @@ impl AccountM return Err(eyre::eyre!("No accounts found in file")); } - Ok(Self { accounts: Arc::new(Mutex::new(accounts)), eth_client }) + Ok(Self { accounts, eth_client }) } /// Starts the account manager task that periodically checks account balances and processes transactions. - #[allow(clippy::significant_drop_tightening)] pub fn start(&'static self, rt_handle: &Handle) { - let accounts = self.accounts.clone(); - rt_handle.spawn(async move { loop { - // Get account addresses first without acquiring the lock - let account_addresses: Vec = { - let accounts = accounts.lock().await; - accounts.keys().copied().collect() - }; - - // Iterate over account addresses and check balances - for account_address in account_addresses { - // Fetch the balance and handle errors functionally - let balance = self - .get_balance(account_address) - .await - .inspect_err(|err| { - tracing::error!( - "Error getting balance for account_address {:?}: {:?}", - account_address, - err - ); - }) - .unwrap_or_default(); - - if balance > U256::from(u128::pow(10, 18)) { - // Acquire lock only when necessary to modify account state - let mut accounts = accounts.lock().await; - if let Some(account_nonce) = accounts.get_mut(&account_address) { - self.process_transaction(&account_address, account_nonce); - } - } + // TODO: add a listener on the pool and only try to call [`best_transaction`] + // TODO: when we are sure there is a transaction in the pool. This avoids an + // TODO: constant loop which rarely yields to the executor combined with a + // TODO: sleep which could sleep for a while before handling transactions. + let best_hashes = + self.eth_client.mempool().as_ref().best_transactions().map(|x| *x.hash()).collect::>(); + if let Some(best_hash) = best_hashes.first() { + let (_address, mut locked_account_nonce) = self.lock_account().await; + + // TODO: here we send the transaction on the starknet network + // Increment account_nonce after sending a transaction + *locked_account_nonce = *locked_account_nonce + 1; + + // Only release the lock once the transaction has been broadcast + drop(locked_account_nonce); + + self.eth_client.mempool().as_ref().remove_transactions(vec![*best_hash]); } tokio::time::sleep(Duration::from_secs(1)).await; @@ -120,6 +108,37 @@ impl AccountM }); } + /// Returns the next available account from the manager. + async fn lock_account(&self) -> (Felt, MutexGuard<'_, Felt>) + where + SP: starknet::providers::Provider + Send + Sync + Clone + 'static, + { + loop { + // use [`select_all`] to poll an iterator over impl Future)> + // We use Box::pin because this Future doesn't implement `Unpin`. + let fut_locks = + self.accounts.iter().map(|(address, nonce)| Box::pin(async { (*address, nonce.lock().await) })); + let ((account_address, guard), _, _) = select_all(fut_locks).await; + + // Fetch the balance of the selected account + let balance = self + .get_balance(account_address) + .await + .inspect_err(|err| { + tracing::error!(target: "account_manager", ?account_address, ?err, "failed to fetch balance"); + }) + .unwrap_or_default(); + + // If the balance is lower than the threshold, continue + if balance < U256::from(u128::pow(10, 18)) { + continue; + } + + // Return the account address and the guard on the nonce + return (account_address, guard); + } + } + /// Retrieves the balance of the specified account address. async fn get_balance(&self, account_address: Felt) -> eyre::Result { // Convert the optional Ethereum block ID to a Starknet block ID. @@ -127,19 +146,4 @@ impl AccountM // Get the balance of the address at the given block ID. self.eth_client.starknet_provider().balance_at(account_address, starknet_block_id).await.map_err(Into::into) } - - /// Processes a transaction for the given account if the balance is sufficient. - fn process_transaction(&self, _account_address: &Felt, account_nonce: &mut Felt) - where - SP: starknet::providers::Provider + Send + Sync + Clone + 'static, - { - let best_hashes = self.eth_client.mempool().as_ref().best_transactions().map(|x| *x.hash()).collect::>(); - - if let Some(best_hash) = best_hashes.first() { - self.eth_client.mempool().as_ref().remove_transactions(vec![*best_hash]); - - // Increment account_nonce after sending a transaction - *account_nonce = *account_nonce + 1; - } - } }