Skip to content

Commit

Permalink
Add KakarotTransactions trait to upstream send_raw_transaction (#…
Browse files Browse the repository at this point in the history
…1352)

* Add KakarotTransactions trait to upstream send_raw_transaction

* fmt

* Update src/client/mod.rs

Co-authored-by: greged93 <[email protected]>

* fix conflicts

* trigger ci

---------

Co-authored-by: greged93 <[email protected]>
  • Loading branch information
tcoratger and greged93 authored Sep 5, 2024
1 parent 95e0a0f commit 45f3fb5
Show file tree
Hide file tree
Showing 14 changed files with 224 additions and 200 deletions.
109 changes: 100 additions & 9 deletions src/client/mod.rs
Original file line number Diff line number Diff line change
@@ -1,19 +1,39 @@
use crate::{
models::transaction::validate_transaction,
pool::{
mempool::{KakarotPool, TransactionOrdering},
validate::KakarotTransactionValidatorBuilder,
},
providers::eth_provider::{
database::{state::EthDatabase, Database},
error::KakarotError,
provider::EthDataProvider,
chain::ChainProvider,
database::{
ethereum::{EthereumBlockStore, EthereumTransactionStore},
state::EthDatabase,
Database,
},
error::{EthApiError, EthereumDataFormatError, KakarotError, SignatureError, TransactionError},
provider::{EthApiResult, EthDataProvider},
starknet::kakarot_core::to_starknet_transaction,
},
};
use alloy_rlp::Decodable;
use async_trait::async_trait;
use num_traits::ToPrimitive;
use reth_chainspec::ChainSpec;
use reth_transaction_pool::{blobstore::NoopBlobStore, EthPooledTransaction, PoolConfig};
use reth_primitives::{Bytes, TransactionSigned, TransactionSignedEcRecovered, B256};
use reth_rpc_types_compat::transaction::from_recovered;
use reth_transaction_pool::{
blobstore::NoopBlobStore, EthPooledTransaction, PoolConfig, TransactionOrigin, TransactionPool,
};
use starknet::{core::types::Felt, providers::Provider};
use std::sync::Arc;
use tracing::Instrument;

#[async_trait]
pub trait KakarotTransactions {
/// Send a raw transaction to the network and returns the transactions hash.
async fn send_raw_transaction(&self, transaction: Bytes) -> EthApiResult<B256>;
}

/// Provides a wrapper structure around the Ethereum Provider
/// and the Mempool.
Expand All @@ -27,16 +47,15 @@ impl<SP> EthClient<SP>
where
SP: Provider + Clone + Sync + Send,
{
/// Tries to start a [`EthClient`] by fetching the current chain id, initializing a [`EthDataProvider`] and
/// a `Pool`.
/// Tries to start a [`EthClient`] by fetching the current chain id, initializing a [`EthDataProvider`] and a [`Pool`].
pub async fn try_new(starknet_provider: SP, database: Database) -> eyre::Result<Self> {
let chain = (starknet_provider.chain_id().await.map_err(KakarotError::from)?.to_bigint()
& Felt::from(u32::MAX).to_bigint())
.to_u64()
.unwrap();

// Create a new EthDataProvider instance with the initialized database and Starknet provider.
let mut eth_provider = EthDataProvider::try_new(database, starknet_provider).await?;
let eth_provider = EthDataProvider::try_new(database, starknet_provider).await?;

let validator =
KakarotTransactionValidatorBuilder::new(Arc::new(ChainSpec { chain: chain.into(), ..Default::default() }))
Expand All @@ -49,8 +68,6 @@ where
PoolConfig::default(),
));

eth_provider.set_mempool(pool.clone());

Ok(Self { eth_provider, pool })
}

Expand All @@ -64,3 +81,77 @@ where
self.pool.clone()
}
}

#[async_trait]
impl<SP> KakarotTransactions for EthClient<SP>
where
SP: Provider + Clone + Sync + Send,
{
async fn send_raw_transaction(&self, transaction: Bytes) -> EthApiResult<B256> {
// Decode the transaction data
let transaction_signed = TransactionSigned::decode(&mut transaction.0.as_ref())
.map_err(|_| EthApiError::EthereumDataFormat(EthereumDataFormatError::TransactionConversion))?;

let chain_id: u64 = self
.eth_provider
.chain_id()
.await?
.unwrap_or_default()
.try_into()
.map_err(|_| TransactionError::InvalidChainId)?;

// Validate the transaction
let latest_block_header =
self.eth_provider.database().latest_header().await?.ok_or(EthApiError::UnknownBlockNumber(None))?;
validate_transaction(&transaction_signed, chain_id, &latest_block_header)?;

// Recover the signer from the transaction
let signer = transaction_signed.recover_signer().ok_or(SignatureError::Recovery)?;

// Get the number of retries for the transaction
let retries = self.eth_provider.database().pending_transaction_retries(&transaction_signed.hash).await?;

let transaction_signed_ec_recovered =
TransactionSignedEcRecovered::from_signed_transaction(transaction_signed.clone(), signer);

let encoded_length = transaction_signed_ec_recovered.clone().length_without_header();

// Upsert the transaction as pending in the database
let transaction = from_recovered(transaction_signed_ec_recovered.clone());
self.eth_provider.database().upsert_pending_transaction(transaction, retries).await?;

// Convert the Ethereum transaction to a Starknet transaction
let starknet_transaction = to_starknet_transaction(&transaction_signed, signer, retries)?;

// Deploy EVM transaction signer if Hive feature is enabled
#[cfg(feature = "hive")]
self.eth_provider.deploy_evm_transaction_signer(signer).await?;

// Add the transaction to the Starknet provider
let span = tracing::span!(tracing::Level::INFO, "sn::add_invoke_transaction");
let res = self
.eth_provider
.starknet_provider()
.add_invoke_transaction(starknet_transaction)
.instrument(span)
.await
.map_err(KakarotError::from)?;

let pool_transaction = EthPooledTransaction::new(transaction_signed_ec_recovered, encoded_length);

// Don't handle the result in case we are adding multiple times the same transaction due to the retry.
let _ = self.pool.as_ref().add_transaction(TransactionOrigin::Local, pool_transaction).await;

// Return transaction hash if testing feature is enabled, otherwise log and return Ethereum hash
if cfg!(feature = "testing") {
return Ok(B256::from_slice(&res.transaction_hash.to_bytes_be()[..]));
}
let hash = transaction_signed.hash();
tracing::info!(
ethereum_hash = ?hash,
starknet_hash = ?B256::from_slice(&res.transaction_hash.to_bytes_be()[..]),
);

Ok(hash)
}
}
4 changes: 2 additions & 2 deletions src/eth_rpc/servers/eth_rpc.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
#![allow(clippy::blocks_in_conditions)]

use crate::{
client::EthClient,
client::{EthClient, KakarotTransactions},
eth_rpc::api::eth_api::EthApiServer,
providers::eth_provider::{
constant::MAX_PRIORITY_FEE_PER_GAS, error::EthApiError, BlockProvider, ChainProvider, GasProvider, LogProvider,
Expand Down Expand Up @@ -238,7 +238,7 @@ where
#[tracing::instrument(skip_all, ret, err)]
async fn send_raw_transaction(&self, bytes: Bytes) -> Result<B256> {
tracing::info!("Serving eth_sendRawTransaction");
Ok(self.eth_client.eth_provider().send_raw_transaction(bytes).await?)
Ok(self.eth_client.send_raw_transaction(bytes).await?)
}

async fn sign(&self, _address: Address, _message: Bytes) -> Result<Bytes> {
Expand Down
3 changes: 1 addition & 2 deletions src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -49,10 +49,9 @@ async fn main() -> Result<()> {
let starknet_provider = Arc::new(starknet_provider);

let eth_client = EthClient::try_new(starknet_provider, db.clone()).await.expect("failed to start ethereum client");
let eth_provider = eth_client.eth_provider().clone();

// Setup the retry handler
let retry_handler = RetryHandler::new(eth_provider, db);
let retry_handler = RetryHandler::new(eth_client.clone(), db);
retry_handler.start(&tokio::runtime::Handle::current());

// Setup the RPC module
Expand Down
17 changes: 8 additions & 9 deletions src/pool/mempool.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
use super::validate::KakarotTransactionValidator;
use crate::providers::eth_provider::provider::EthDataProvider;
use crate::pool::EthClient;
use reth_transaction_pool::{
blobstore::NoopBlobStore, CoinbaseTipOrdering, EthPooledTransaction, Pool, TransactionPool,
};
Expand Down Expand Up @@ -58,35 +58,34 @@ impl AccountManager {
Self { accounts }
}

pub fn start<SP>(&self, rt_handle: &Handle, eth_provider: &'static EthDataProvider<SP>)
pub fn start<SP>(&self, rt_handle: &Handle, eth_client: &'static EthClient<SP>)
where
SP: starknet::providers::Provider + Send + Sync + 'static,
SP: starknet::providers::Provider + Send + Sync + Clone + 'static,
{
let accounts = self.accounts.clone();

rt_handle.spawn(async move {
loop {
for address in &accounts {
Self::process_transaction(address, eth_provider);
Self::process_transaction(address, eth_client);
}

tokio::time::sleep(Duration::from_secs(1)).await;
}
});
}

fn process_transaction<SP>(address: &Felt, eth_provider: &EthDataProvider<SP>)
fn process_transaction<SP>(address: &Felt, eth_client: &EthClient<SP>)
where
SP: starknet::providers::Provider + Send + Sync + 'static,
SP: starknet::providers::Provider + Send + Sync + Clone + 'static,
{
let balance = Self::check_balance(address);

if balance > Felt::ONE {
let best_hashes =
eth_provider.mempool.as_ref().unwrap().best_transactions().map(|x| *x.hash()).collect::<Vec<_>>();
let best_hashes = eth_client.mempool().as_ref().best_transactions().map(|x| *x.hash()).collect::<Vec<_>>();

if let Some(best_hash) = best_hashes.first() {
eth_provider.mempool.as_ref().unwrap().remove_transactions(vec![*best_hash]);
eth_client.mempool().as_ref().remove_transactions(vec![*best_hash]);
}
}
}
Expand Down
30 changes: 16 additions & 14 deletions src/pool/mod.rs
Original file line number Diff line number Diff line change
@@ -1,16 +1,17 @@
#![allow(clippy::used_underscore_binding)]
use crate::providers::eth_provider::{
database::{
use crate::{
client::{EthClient, KakarotTransactions},
providers::eth_provider::database::{
ethereum::EthereumTransactionStore,
filter::{self, EthDatabaseFilterBuilder},
types::transaction::StoredPendingTransaction,
Database,
},
provider::EthereumProvider,
};
use eyre::Result;
use opentelemetry::metrics::{Gauge, Unit};
use reth_primitives::{TransactionSignedEcRecovered, B256};
use starknet::providers::Provider;
use std::{
fmt,
str::FromStr,
Expand All @@ -37,9 +38,9 @@ pub fn get_transaction_max_retries() -> u8 {
}

/// The [`RetryHandler`] is responsible for retrying transactions that have failed.
pub struct RetryHandler<P: EthereumProvider> {
/// The Ethereum provider.
eth_provider: P,
pub struct RetryHandler<SP: Provider + Clone + Send + Sync> {
/// The Ethereum client.
eth_client: EthClient<SP>,
/// The database.
database: Database,
/// The time to retry transactions recorded in an Observable.
Expand All @@ -49,29 +50,29 @@ pub struct RetryHandler<P: EthereumProvider> {
retried: Arc<Mutex<Vec<B256>>>,
}

impl<P: EthereumProvider> fmt::Debug for RetryHandler<P> {
impl<SP: Provider + Clone + Send + Sync> fmt::Debug for RetryHandler<SP> {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.debug_struct("RetryHandler")
.field("eth_provider", &"...")
.field("eth_client", &"...")
.field("database", &self.database)
.finish_non_exhaustive()
}
}

impl<P> RetryHandler<P>
impl<SP> RetryHandler<SP>
where
P: EthereumProvider + Send + Sync + 'static,
SP: Provider + Clone + Send + Sync + 'static,
{
/// Creates a new [`RetryHandler`] with the given Ethereum provider, database.
#[allow(clippy::missing_const_for_fn)]
pub fn new(eth_provider: P, database: Database) -> Self {
pub fn new(eth_client: EthClient<SP>, database: Database) -> Self {
let retry_time = opentelemetry::global::meter("retry_service")
.u64_gauge("retry_time")
.with_description("The time taken to process pending transactions")
.with_unit(Unit::new("microseconds"))
.init();
Self {
eth_provider,
eth_client,
database,
retry_time,
#[cfg(test)]
Expand Down Expand Up @@ -126,7 +127,7 @@ where
}
};

let _hash = self.eth_provider.send_raw_transaction(transaction.into_signed().envelope_encoded()).await?;
let _hash = self.eth_client.send_raw_transaction(transaction.into_signed().envelope_encoded()).await?;
#[cfg(test)]
self.retried.lock().await.push(_hash);
Ok(())
Expand Down Expand Up @@ -170,8 +171,9 @@ mod tests {
async fn test_retry_handler(#[future] katana: Katana, _setup: ()) {
// Given
let eth_provider = katana.eth_provider();
let eth_client = katana.eth_client();
let db = eth_provider.database().clone();
let retry_handler = RetryHandler::new(eth_provider.clone(), db.clone());
let retry_handler = RetryHandler::new(eth_client, db.clone());

// Insert the first transaction into the pending transactions collection with 0 pool
let transaction1 = katana.eoa().mock_transaction_with_nonce(0).await.expect("Failed to get mock transaction");
Expand Down
15 changes: 1 addition & 14 deletions src/providers/eth_provider/provider.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,9 +24,6 @@ use itertools::Itertools;
use mongodb::bson::doc;
use num_traits::cast::ToPrimitive;
use reth_primitives::{BlockId, BlockNumberOrTag, TxKind, U256};
use std::sync::Arc;

use crate::pool::mempool::KakarotPool;
use reth_rpc_types::{BlockHashOrNumber, TransactionRequest};
use starknet::core::types::Felt;
use tracing::{instrument, Instrument};
Expand Down Expand Up @@ -66,7 +63,6 @@ pub struct EthDataProvider<SP: starknet::providers::Provider + Send + Sync> {
database: Database,
starknet_provider: SP,
pub(crate) chain_id: u64,
pub mempool: Option<Arc<KakarotPool<Self>>>,
}

impl<SP> EthDataProvider<SP>
Expand All @@ -82,11 +78,6 @@ where
pub const fn starknet_provider(&self) -> &SP {
&self.starknet_provider
}

/// Returns a reference to the pool.
pub fn mempool(&self) -> Option<Arc<KakarotPool<Self>>> {
self.mempool.clone()
}
}

impl<SP> EthDataProvider<SP>
Expand All @@ -100,11 +91,7 @@ where
let chain_id =
(Felt::from(u32::MAX).to_biguint() & starknet_provider.chain_id().await?.to_biguint()).try_into().unwrap(); // safe unwrap

Ok(Self { database, starknet_provider, chain_id, mempool: None })
}

pub fn set_mempool(&mut self, mempool: Arc<KakarotPool<Self>>) {
self.mempool = Some(mempool);
Ok(Self { database, starknet_provider, chain_id })
}

/// Prepare the call input for an estimate gas or call from a transaction request.
Expand Down
Loading

0 comments on commit 45f3fb5

Please sign in to comment.