Skip to content

Commit

Permalink
Merge branch '1466_non_state' into 1466_fix_non_state
Browse files Browse the repository at this point in the history
  • Loading branch information
fishseabowl authored May 27, 2024
2 parents 3a19201 + 4717aac commit ad60b12
Show file tree
Hide file tree
Showing 3 changed files with 251 additions and 2 deletions.
4 changes: 4 additions & 0 deletions crates/katana/primitives/src/event.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,10 @@
use core::fmt;
use std::num::ParseIntError;

pub type ChunkSize = u64;
pub type EventContinuationToken = Option<String>;
pub type EventFilter = starknet::core::types::EventFilter;
pub type EventsPage = starknet::core::types::EventsPage;
use crate::FieldElement;

#[derive(Debug, Clone, PartialEq, Eq)]
Expand Down
1 change: 1 addition & 0 deletions crates/katana/primitives/src/transaction.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ use crate::{utils, FieldElement};
pub type TxHash = FieldElement;
/// The sequential number for all the transactions.
pub type TxNumber = u64;
pub type Transaction = starknet::core::types::Transaction;

#[derive(Debug, Clone, PartialEq, Eq)]
#[cfg_attr(feature = "serde", derive(serde::Serialize, serde::Deserialize))]
Expand Down
248 changes: 246 additions & 2 deletions crates/katana/storage/provider/src/providers/fork/backend.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,13 +11,15 @@ use futures::channel::mpsc::{channel as async_channel, Receiver, SendError, Send
use futures::future::BoxFuture;
use futures::stream::Stream;
use futures::{Future, FutureExt};
use katana_primitives::block::BlockHashOrNumber;
use katana_primitives::block::{BlockHashOrNumber, BlockIdOrTag};
use katana_primitives::class::{ClassHash, CompiledClass, CompiledClassHash, FlattenedSierraClass};
use katana_primitives::contract::{ContractAddress, Nonce, StorageKey, StorageValue};
use katana_primitives::conversion::rpc::{
compiled_class_hash_from_flattened_sierra_class, flattened_sierra_to_compiled_class,
legacy_rpc_to_compiled_class,
};
use katana_primitives::event::{ChunkSize, EventContinuationToken, EventFilter, EventsPage};
use katana_primitives::transaction::{Transaction, TxHash, TxNumber};
use katana_primitives::FieldElement;
use parking_lot::Mutex;
use starknet::core::types::{BlockId, ContractClass as RpcContractClass, StarknetError};
Expand All @@ -30,7 +32,20 @@ use crate::traits::contract::ContractClassProvider;
use crate::traits::state::StateProvider;
use crate::ProviderResult;


const LOG_TARGET: &str = "forking::backend";
type GetNonceResult = Result<Nonce, ForkedBackendError>;
type GetStorageResult = Result<StorageValue, ForkedBackendError>;
type GetClassHashAtResult = Result<ClassHash, ForkedBackendError>;
type GetClassAtResult = Result<starknet::core::types::ContractClass, ForkedBackendError>;
type GetEventResult = Result<EventsPage, ForkedBackendError>;
type GetBlockWithTxHashesResult =
Result<starknet::core::types::MaybePendingBlockWithTxHashes, ForkedBackendError>;
type GetBlockWithTxsResult =
Result<starknet::core::types::MaybePendingBlockWithTxs, ForkedBackendError>;
type GetTransactionResult = Result<Transaction, ForkedBackendError>;
type GetTransactionReceiptResult =
Result<starknet::core::types::MaybePendingTransactionReceipt, ForkedBackendError>;

type BackendResult<T> = Result<T, BackendError>;

Expand Down Expand Up @@ -67,6 +82,12 @@ enum BackendRequest {
Class(Request<ClassHash, RpcContractClass>),
ClassHash(Request<ContractAddress, ClassHash>),
Storage(Request<(ContractAddress, StorageKey), StorageValue>),
GetEvents(EventFilter, EventContinuationToken, ChunkSize, OneshotSender<GetEventResult>),
GetBlockWithTxHash(BlockIdOrTag, OneshotSender<GetBlockWithTxHashesResult>),
GetBlockWithTxs(BlockIdOrTag, OneshotSender<GetBlockWithTxsResult>),
GetTransactionByBlockIdAndIndex(BlockIdOrTag, TxNumber, OneshotSender<GetTransactionResult>),
GetTransactionByHash(TxHash, OneshotSender<GetTransactionResult>),
GetTransactionReceipt(TxHash, OneshotSender<GetTransactionReceiptResult>),
// Test-only request kind for requesting the backend stats
#[cfg(test)]
Stats(OneshotSender<usize>),
Expand Down Expand Up @@ -107,6 +128,7 @@ impl BackendRequest {
let (sender, receiver) = oneshot();
(BackendRequest::Stats(sender), receiver)
}

}

type BackendRequestFuture = BoxFuture<'static, ()>;
Expand Down Expand Up @@ -234,10 +256,89 @@ where
self.pending_requests.push(fut);
}


#[cfg(test)]
BackendRequest::Stats(sender) => {
let total_ongoing_request = self.pending_requests.len();
sender.send(total_ongoing_request).expect("failed to send backend stats");

BackendRequest::GetEvents(filter, continuation_token, chunks_size, sender) => {
let fut = Box::pin(async move {
let res = provider
.get_events(filter, continuation_token, chunks_size)
.await
.map_err(ForkedBackendError::StarknetProvider);

sender.send(res).expect("failed to send events result")
});

self.pending_requests.push(fut);
}

BackendRequest::GetBlockWithTxHash(block_id, sender) => {
let fut = Box::pin(async move {
let res = provider
.get_block_with_tx_hashes(block_id)
.await
.map_err(ForkedBackendError::StarknetProvider);

sender.send(res).expect("failed to send block result")
});

self.pending_requests.push(fut);
}

BackendRequest::GetBlockWithTxs(block_id, sender) => {
let fut = Box::pin(async move {
let res = provider
.get_block_with_txs(block_id)
.await
.map_err(ForkedBackendError::StarknetProvider);

sender.send(res).expect("failed to send block result")
});

self.pending_requests.push(fut);
}

BackendRequest::GetTransactionByBlockIdAndIndex(block_id, index, sender) => {
let fut = Box::pin(async move {
let res = provider
.get_transaction_by_block_id_and_index(block_id, index)
.await
.map_err(ForkedBackendError::StarknetProvider);

sender.send(res).expect("failed to send transaction result")
});

self.pending_requests.push(fut);
}

BackendRequest::GetTransactionByHash(transaction_hash, sender) => {
let fut = Box::pin(async move {
let res = provider
.get_transaction_by_hash(transaction_hash)
.await
.map_err(ForkedBackendError::StarknetProvider);

sender.send(res).expect("failed to send transaction result")
});

self.pending_requests.push(fut);
}

BackendRequest::GetTransactionReceipt(transaction_hash, sender) => {
let fut = Box::pin(async move {
let res = provider
.get_transaction_receipt(transaction_hash)
.await
.map_err(ForkedBackendError::StarknetProvider);

sender.send(res).expect("failed to send transaction result")
});

self.pending_requests.push(fut);

}
}
}
Expand Down Expand Up @@ -314,6 +415,7 @@ impl BackendHandle {
&self,
address: ContractAddress,
key: StorageKey,

) -> Result<StorageValue, BackendError> {
trace!(target: LOG_TARGET, %address, key = %format!("{key:#x}"), "Requesting contract storage.");
let (req, rx) = BackendRequest::storage(address, key);
Expand Down Expand Up @@ -352,6 +454,7 @@ impl BackendHandle {
}
}


/// Send a request to the backend thread.
fn request(&self, req: BackendRequest) -> Result<(), BackendError> {
self.0.lock().try_send(req).map_err(|e| e.into_send_error())?;
Expand All @@ -363,6 +466,87 @@ impl BackendHandle {
let (req, rx) = BackendRequest::stats();
self.request(req)?;
Ok(rx.recv()?)

pub fn do_get_events(
&self,
filter: EventFilter,
continuation_token: Option<String>,
chunks_size: ChunkSize,
) -> Result<EventsPage, ForkedBackendError> {
trace!(target: LOG_TARGET, "requesting evetns at filter{filter:#?}, continuation_token {continuation_token:#?}, and chunks_size {chunks_size:#?} ");
let (sender, rx) = oneshot();
self.0
.lock()
.try_send(BackendRequest::GetEvents(filter, continuation_token, chunks_size, sender))
.map_err(|e| e.into_send_error())?;
rx.recv()?
}

pub fn do_get_block_with_tx_hashes(
&self,
block_id: BlockIdOrTag,
) -> Result<starknet::core::types::MaybePendingBlockWithTxHashes, ForkedBackendError> {
trace!(target: LOG_TARGET, "requesting block with tx_hashes at block {block_id:#?} ");
let (sender, rx) = oneshot();
self.0
.lock()
.try_send(BackendRequest::GetBlockWithTxHash(block_id, sender))
.map_err(|e| e.into_send_error())?;
rx.recv()?
}

pub fn do_get_block_with_txs(
&self,
block_id: BlockIdOrTag,
) -> Result<starknet::core::types::MaybePendingBlockWithTxs, ForkedBackendError> {
trace!(target: LOG_TARGET, "requesting block with txs at block {block_id:#?} ");
let (sender, rx) = oneshot();
self.0
.lock()
.try_send(BackendRequest::GetBlockWithTxs(block_id, sender))
.map_err(|e| e.into_send_error())?;
rx.recv()?
}

pub fn do_get_transaction_by_block_id_and_index(
&self,
block_id: BlockIdOrTag,
index: TxNumber,
) -> Result<Transaction, ForkedBackendError> {
trace!(target: LOG_TARGET, "requesting transaction at block {block_id:#?}, index {index:#?}");
let (sender, rx) = oneshot();
self.0
.lock()
.try_send(BackendRequest::GetTransactionByBlockIdAndIndex(block_id, index, sender))
.map_err(|e| e.into_send_error())?;
rx.recv()?
}

pub fn do_get_transaction_by_hash(
&self,
transaction_hash: TxHash,
) -> Result<Transaction, ForkedBackendError> {
trace!(target: LOG_TARGET, "requesting transaction at trasanction hash {transaction_hash:#?} ");
let (sender, rx) = oneshot();
self.0
.lock()
.try_send(BackendRequest::GetTransactionByHash(transaction_hash, sender))
.map_err(|e| e.into_send_error())?;
rx.recv()?
}

pub fn do_get_transaction_receipt(
&self,
transaction_hash: TxHash,
) -> Result<starknet::core::types::MaybePendingTransactionReceipt, ForkedBackendError> {
trace!(target: LOG_TARGET, "requesting transaction receipt at trasanction hash {transaction_hash:#?} ");
let (sender, rx) = oneshot();
self.0
.lock()
.try_send(BackendRequest::GetTransactionReceipt(transaction_hash, sender))
.map_err(|e| e.into_send_error())?;
rx.recv()?

}
}

Expand Down Expand Up @@ -409,10 +593,12 @@ impl StateProvider for SharedStateProvider {
return Ok(nonce);
}


if let Some(nonce) = handle_not_found_err(self.0.get_nonce(address)).map_err(|error| {
error!(target: LOG_TARGET, %address, %error, "Fetching nonce.");
error
})? {

self.0.contract_state.write().entry(address).or_default().nonce = nonce;
Ok(Some(nonce))
} else {
Expand All @@ -430,7 +616,6 @@ impl StateProvider for SharedStateProvider {
{
return Ok(value.copied());
}

let value =
handle_not_found_err(self.0.get_storage(address, storage_key)).map_err(|error| {
error!(target: LOG_TARGET, %address, storage_key = %format!("{storage_key:#x}"), %error, "Fetching storage value.");
Expand Down Expand Up @@ -599,6 +784,7 @@ fn handle_not_found_err<T>(result: Result<T, BackendError>) -> Result<Option<T>,
}

#[cfg(test)]

pub(crate) mod test_utils {

use std::sync::mpsc::sync_channel;
Expand Down Expand Up @@ -645,6 +831,7 @@ mod tests {

use std::time::Duration;


use katana_primitives::contract::GenericContractInfo;
use starknet::macros::felt;

Expand Down Expand Up @@ -788,4 +975,61 @@ mod tests {
assert_eq!(class_hash, class_hash_in_cache, "value must be stored in cache");
assert_eq!(storage_value, storage_value_in_cache, "value must be stored in cache");
}

#[test]
fn fetch_non_state_data_from_fork() {
let (backend, _) = create_forked_backend(LOCAL_RPC_URL.into(), 1);

assert!(
backend
.do_get_events(
EventFilter {
from_block: Some(starknet::core::types::BlockId::Tag(BlockTag::Latest)),
to_block: Some(starknet::core::types::BlockId::Tag(BlockTag::Latest)),
address: None,
keys: None,
},
None,
10,
)
.is_ok()
);

assert!(
backend.do_get_block_with_tx_hashes(starknet::core::types::BlockId::Number(0)).is_ok()
);

assert!(backend.do_get_block_with_txs(starknet::core::types::BlockId::Number(0)).is_ok());

assert!(
backend
.do_get_transaction_by_block_id_and_index(
starknet::core::types::BlockId::Number(0),
1
)
.is_ok()
);

assert!(
backend
.do_get_transaction_by_hash(
FieldElement::from_hex_be(
"0x41a78e741e5af2fec34b695679bc6891742439f7afb8484ecd7766661ad02bf",
)
.unwrap()
)
.is_err()
);

assert!(
backend
.do_get_transaction_receipt(
FieldElement::from_hex_be(
"0x41a78e741e5af2fec34b695679bc6891742439f7afb8484ecd7766661ad02bf",
)
.unwrap()
)
.is_err()
);
}
}

0 comments on commit ad60b12

Please sign in to comment.