Skip to content

Commit

Permalink
dev: account manager (#1373)
Browse files Browse the repository at this point in the history
* update the account manager to lower the constraints on the locks

* lint

* update naming
  • Loading branch information
greged93 authored Sep 11, 2024
1 parent 85b7217 commit 4f32004
Showing 1 changed file with 61 additions and 57 deletions.
118 changes: 61 additions & 57 deletions src/pool/mempool.rs
Original file line number Diff line number Diff line change
@@ -1,13 +1,17 @@
use super::validate::KakarotTransactionValidator;
use crate::client::EthClient;
use futures::future::select_all;
use reth_primitives::{BlockId, U256};
use reth_transaction_pool::{
blobstore::NoopBlobStore, CoinbaseTipOrdering, EthPooledTransaction, Pool, TransactionPool,
};
use serde_json::Value;
use starknet::core::types::Felt;
use std::{collections::HashMap, fs::File, io::Read, sync::Arc, time::Duration};
use tokio::{runtime::Handle, sync::Mutex};
use tokio::{
runtime::Handle,
sync::{Mutex, MutexGuard},
};

/// A type alias for the Kakarot Transaction Validator.
/// Uses the Reth implementation [`TransactionValidationTaskExecutor`].
Expand All @@ -27,7 +31,7 @@ pub type KakarotPool<Client> = Pool<Validator<Client>, TransactionOrdering, Noop
#[derive(Debug)]
pub struct AccountManager<SP: starknet::providers::Provider + Send + Sync + Clone + 'static> {
/// A shared, mutable collection of accounts and their nonces.
accounts: Arc<Mutex<HashMap<Felt, Felt>>>,
accounts: HashMap<Felt, Arc<Mutex<Felt>>>,
/// The Ethereum client used to interact with the blockchain.
eth_client: Arc<EthClient<SP>>,
}
Expand Down Expand Up @@ -59,14 +63,12 @@ impl<SP: starknet::providers::Provider + Send + Sync + Clone + 'static> AccountM
.map_err(|e| eyre::eyre!("Error converting block ID: {:?}", e))?;

// Query the initial account_nonce for the account from the provider
accounts.insert(
felt_address,
eth_client
.starknet_provider()
.get_nonce(starknet_block_id, felt_address)
.await
.unwrap_or_default(),
);
let nonce = eth_client
.starknet_provider()
.get_nonce(starknet_block_id, felt_address)
.await
.unwrap_or_default();
accounts.insert(felt_address, Arc::new(Mutex::new(nonce)));
}
}
}
Expand All @@ -75,71 +77,73 @@ impl<SP: starknet::providers::Provider + Send + Sync + Clone + 'static> AccountM
return Err(eyre::eyre!("No accounts found in file"));
}

Ok(Self { accounts: Arc::new(Mutex::new(accounts)), eth_client })
Ok(Self { accounts, eth_client })
}

/// Starts the account manager task that periodically checks account balances and processes transactions.
#[allow(clippy::significant_drop_tightening)]
pub fn start(&'static self, rt_handle: &Handle) {
let accounts = self.accounts.clone();

rt_handle.spawn(async move {
loop {
// Get account addresses first without acquiring the lock
let account_addresses: Vec<Felt> = {
let accounts = accounts.lock().await;
accounts.keys().copied().collect()
};

// Iterate over account addresses and check balances
for account_address in account_addresses {
// Fetch the balance and handle errors functionally
let balance = self
.get_balance(account_address)
.await
.inspect_err(|err| {
tracing::error!(
"Error getting balance for account_address {:?}: {:?}",
account_address,
err
);
})
.unwrap_or_default();

if balance > U256::from(u128::pow(10, 18)) {
// Acquire lock only when necessary to modify account state
let mut accounts = accounts.lock().await;
if let Some(account_nonce) = accounts.get_mut(&account_address) {
self.process_transaction(&account_address, account_nonce);
}
}
// TODO: add a listener on the pool and only try to call [`best_transaction`]
// TODO: when we are sure there is a transaction in the pool. This avoids an
// TODO: constant loop which rarely yields to the executor combined with a
// TODO: sleep which could sleep for a while before handling transactions.
let best_hashes =
self.eth_client.mempool().as_ref().best_transactions().map(|x| *x.hash()).collect::<Vec<_>>();
if let Some(best_hash) = best_hashes.first() {
let (_address, mut locked_account_nonce) = self.lock_account().await;

// TODO: here we send the transaction on the starknet network
// Increment account_nonce after sending a transaction
*locked_account_nonce = *locked_account_nonce + 1;

// Only release the lock once the transaction has been broadcast
drop(locked_account_nonce);

self.eth_client.mempool().as_ref().remove_transactions(vec![*best_hash]);
}

tokio::time::sleep(Duration::from_secs(1)).await;
}
});
}

/// Returns the next available account from the manager.
async fn lock_account(&self) -> (Felt, MutexGuard<'_, Felt>)
where
SP: starknet::providers::Provider + Send + Sync + Clone + 'static,
{
loop {
// use [`select_all`] to poll an iterator over impl Future<Output = (Felt, MutexGuard<Felt>)>
// We use Box::pin because this Future doesn't implement `Unpin`.
let fut_locks =
self.accounts.iter().map(|(address, nonce)| Box::pin(async { (*address, nonce.lock().await) }));
let ((account_address, guard), _, _) = select_all(fut_locks).await;

// Fetch the balance of the selected account
let balance = self
.get_balance(account_address)
.await
.inspect_err(|err| {
tracing::error!(target: "account_manager", ?account_address, ?err, "failed to fetch balance");
})
.unwrap_or_default();

// If the balance is lower than the threshold, continue
if balance < U256::from(u128::pow(10, 18)) {
continue;
}

// Return the account address and the guard on the nonce
return (account_address, guard);
}
}

/// Retrieves the balance of the specified account address.
async fn get_balance(&self, account_address: Felt) -> eyre::Result<U256> {
// Convert the optional Ethereum block ID to a Starknet block ID.
let starknet_block_id = self.eth_client.eth_provider().to_starknet_block_id(Some(BlockId::default())).await?;
// Get the balance of the address at the given block ID.
self.eth_client.starknet_provider().balance_at(account_address, starknet_block_id).await.map_err(Into::into)
}

/// Processes a transaction for the given account if the balance is sufficient.
fn process_transaction(&self, _account_address: &Felt, account_nonce: &mut Felt)
where
SP: starknet::providers::Provider + Send + Sync + Clone + 'static,
{
let best_hashes = self.eth_client.mempool().as_ref().best_transactions().map(|x| *x.hash()).collect::<Vec<_>>();

if let Some(best_hash) = best_hashes.first() {
self.eth_client.mempool().as_ref().remove_transactions(vec![*best_hash]);

// Increment account_nonce after sending a transaction
*account_nonce = *account_nonce + 1;
}
}
}

0 comments on commit 4f32004

Please sign in to comment.