-
Notifications
You must be signed in to change notification settings - Fork 1.6k
feat(rpc-eth-types+api): add latest chain state tracking to EthStateCache
and EthStateCacheService
#13164
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
feat(rpc-eth-types+api): add latest chain state tracking to EthStateCache
and EthStateCacheService
#13164
Changes from 6 commits
edd46a6
f00001a
754fcba
7f57a12
a266dc3
2479ec6
e009b41
cfe7d1e
e858027
0fde94f
b50de5c
620bc42
dbd1815
c27c947
dceb43a
cedf7d8
0bb8715
1cae39d
8563048
8956533
ecb564b
7cfa72c
02ef18a
644b7dc
18a57fa
2f38b5f
3b79091
fa5d34f
f042047
db44db4
4c81717
2371acb
3ce7ba9
013341b
377c3a3
6c1fad8
debcb2a
6698c49
28fd51c
6634d44
8b83ff0
af8907e
06633c5
24a62d5
d2487cc
f3289d4
364036d
740398f
ff7ee89
0b73703
f32b565
21ab6af
f7940a1
fe164d8
f5ffee0
b2133b7
e545177
9585e7e
0b4af51
e38c64e
d82ee64
e356dfb
63d72d4
2b37cb4
69a568f
f47a205
c3892d9
31b3fa4
0c41422
3ba3a65
c554f69
4bdde14
59c2a1a
6975dca
9b575f6
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -6,7 +6,7 @@ use alloy_consensus::BlockHeader; | |
use alloy_eips::BlockId; | ||
use alloy_primitives::Sealable; | ||
use alloy_rlp::Encodable; | ||
use alloy_rpc_types_eth::{Block, BlockTransactions, Header, Index}; | ||
use alloy_rpc_types_eth::{Block, BlockNumberOrTag, BlockTransactions, Header, Index}; | ||
use futures::Future; | ||
use reth_node_api::BlockBody; | ||
use reth_primitives::{SealedBlockFor, SealedBlockWithSenders}; | ||
|
@@ -100,24 +100,33 @@ pub trait EthBlocks: LoadBlock { | |
.provider() | ||
.pending_block() | ||
.map_err(Self::Error::from_eth_err)? | ||
.map(|block| block.body.transactions().len())) | ||
.map(|block| block.body.transactions().len())); | ||
} | ||
|
||
let block_hash = match self | ||
.provider() | ||
.block_hash_for_id(block_id) | ||
.map_err(Self::Error::from_eth_err)? | ||
{ | ||
Some(block_hash) => block_hash, | ||
None => return Ok(None), | ||
}; | ||
|
||
Ok(self | ||
.cache() | ||
.get_sealed_block_with_senders(block_hash) | ||
.await | ||
.map_err(Self::Error::from_eth_err)? | ||
.map(|b| b.body.transactions().len())) | ||
match block_id { | ||
BlockId::Number(BlockNumberOrTag::Latest) => Ok(self | ||
.cache() | ||
.latest_block_with_senders() | ||
.await | ||
.map_err(Self::Error::from_eth_err)? | ||
.map(|b| b.body.transactions().len())), | ||
_ => { | ||
let block_hash = match self | ||
.provider() | ||
.block_hash_for_id(block_id) | ||
.map_err(Self::Error::from_eth_err)? | ||
{ | ||
Some(block_hash) => block_hash, | ||
None => return Ok(None), | ||
}; | ||
Ok(self | ||
.cache() | ||
.get_sealed_block_with_senders(block_hash) | ||
.await | ||
.map_err(Self::Error::from_eth_err)? | ||
.map(|b| b.body.transactions().len())) | ||
} | ||
} | ||
} | ||
} | ||
|
||
|
@@ -167,7 +176,7 @@ pub trait EthBlocks: LoadBlock { | |
.get_block_and_receipts(block_hash) | ||
.await | ||
.map_err(Self::Error::from_eth_err) | ||
.map(|b| b.map(|(b, r)| (b.block.clone(), r))) | ||
.map(|b| b.map(|(b, r)| (b.block.clone(), r))); | ||
} | ||
|
||
Ok(None) | ||
|
@@ -253,19 +262,48 @@ pub trait LoadBlock: LoadPendingBlock + SpawnBlocking + RpcNodeCoreExt { | |
}; | ||
} | ||
|
||
let block_hash = match self | ||
.provider() | ||
.block_hash_for_id(block_id) | ||
.map_err(Self::Error::from_eth_err)? | ||
{ | ||
Some(block_hash) => block_hash, | ||
None => return Ok(None), | ||
}; | ||
let block = match block_id { | ||
BlockId::Number(BlockNumberOrTag::Latest) => { | ||
if let Some(block) = self | ||
.cache() | ||
.latest_block_with_senders() | ||
.await | ||
.map_err(Self::Error::from_eth_err)? | ||
{ | ||
Some(block) | ||
} else { | ||
// Fallback to traditional lookup if latest isn't cached | ||
match self | ||
.provider() | ||
.block_hash_for_id(block_id) | ||
.map_err(Self::Error::from_eth_err)? | ||
{ | ||
Some(block_hash) => self | ||
.cache() | ||
.get_sealed_block_with_senders(block_hash) | ||
.await | ||
.map_err(Self::Error::from_eth_err)?, | ||
None => None, | ||
} | ||
} | ||
} | ||
_ => { | ||
let block_hash = match self | ||
.provider() | ||
.block_hash_for_id(block_id) | ||
.map_err(Self::Error::from_eth_err)? | ||
{ | ||
Some(block_hash) => block_hash, | ||
None => return Ok(None), | ||
}; | ||
Comment on lines
+265
to
+298
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. @mattsse A bit stuck on the integration of the new function There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. we can look at this later, I believe we can simplify this a bit by introducing a tmp var and then do a |
||
|
||
self.cache() | ||
.get_sealed_block_with_senders(block_hash) | ||
.await | ||
.map_err(Self::Error::from_eth_err) | ||
self.cache() | ||
.get_sealed_block_with_senders(block_hash) | ||
.await | ||
.map_err(Self::Error::from_eth_err)? | ||
} | ||
}; | ||
Ok(block) | ||
} | ||
} | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -61,15 +61,10 @@ type HeaderLruCache<H, L> = MultiConsumerLruCache<B256, H, L, HeaderResponseSend | |
/// | ||
/// This is the frontend for the async caching service which manages cached data on a different | ||
/// task. | ||
#[derive(Debug)] | ||
#[derive(Debug, Clone)] | ||
pub struct EthStateCache<B: Block, R> { | ||
to_service: UnboundedSender<CacheAction<B, R>>, | ||
} | ||
|
||
impl<B: Block, R> Clone for EthStateCache<B, R> { | ||
fn clone(&self) -> Self { | ||
Self { to_service: self.to_service.clone() } | ||
} | ||
latest_chain_change: Option<ChainChange<B, R>>, | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. we don't need to share this and can instead rely on message passing and always fetch this from the service |
||
} | ||
|
||
impl<B: Block, R: Send + Sync> EthStateCache<B, R> { | ||
|
@@ -95,8 +90,9 @@ impl<B: Block, R: Send + Sync> EthStateCache<B, R> { | |
action_rx: UnboundedReceiverStream::new(rx), | ||
action_task_spawner, | ||
rate_limiter: Arc::new(Semaphore::new(max_concurrent_db_operations)), | ||
latest_chain_change: None, | ||
}; | ||
let cache = Self { to_service }; | ||
let cache = Self { to_service, latest_chain_change: None }; | ||
(cache, service) | ||
} | ||
|
||
|
@@ -186,6 +182,21 @@ impl<B: Block, R: Send + Sync> EthStateCache<B, R> { | |
let _ = self.to_service.send(CacheAction::GetHeader { block_hash, response_tx }); | ||
rx.await.map_err(|_| ProviderError::CacheServiceUnavailable)? | ||
} | ||
|
||
/// Returns the most recent canonical block from the cache, if available. | ||
/// Used to efficiently handle latest block requests and avoid race conditions during chain | ||
/// reorgs. | ||
/// Returns `None` if no canonical chain is tracked or during reorgs. | ||
pub async fn latest_block_with_senders( | ||
&self, | ||
) -> ProviderResult<Option<Arc<SealedBlockWithSenders<B>>>> { | ||
if let Some(chain_change) = &self.latest_chain_change { | ||
if let Some(latest_block) = chain_change.blocks.last() { | ||
return self.get_sealed_block_with_senders(latest_block.hash()).await; | ||
} | ||
} | ||
Comment on lines
+193
to
+197
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. this should always send an action to the service, but for fetching the latest, we'd need a new action variant |
||
Ok(None) | ||
} | ||
} | ||
|
||
/// A task than manages caches for data required by the `eth` rpc implementation. | ||
|
@@ -236,6 +247,8 @@ pub(crate) struct EthStateCacheService< | |
action_task_spawner: Tasks, | ||
/// Rate limiter | ||
rate_limiter: Arc<Semaphore>, | ||
/// Tracks latest canonical chain state for cache consistency | ||
latest_chain_change: Option<ChainChange<Provider::Block, Provider::Receipt>>, | ||
} | ||
|
||
impl<Provider, Tasks> EthStateCacheService<Provider, Tasks> | ||
|
@@ -458,7 +471,8 @@ where | |
} | ||
} | ||
CacheAction::CacheNewCanonicalChain { chain_change } => { | ||
for block in chain_change.blocks { | ||
this.latest_chain_change = Some(chain_change.clone()); | ||
for block in chain_change.clone().blocks { | ||
this.on_new_block(block.hash(), Ok(Some(Arc::new(block)))); | ||
} | ||
|
||
|
@@ -527,12 +541,14 @@ enum CacheAction<B: Block, R> { | |
}, | ||
} | ||
|
||
#[derive(Clone, Debug)] | ||
struct BlockReceipts<R> { | ||
block_hash: B256, | ||
receipts: Vec<Option<R>>, | ||
} | ||
|
||
/// A change of the canonical chain | ||
#[derive(Debug, Clone)] | ||
struct ChainChange<B: Block, R> { | ||
blocks: Vec<SealedBlockWithSenders<B>>, | ||
receipts: Vec<BlockReceipts<R>>, | ||
|
@@ -558,23 +574,28 @@ impl<B: Block, R: Clone> ChainChange<B, R> { | |
/// Awaits for new chain events and directly inserts them into the cache so they're available | ||
/// immediately before they need to be fetched from disk. | ||
/// | ||
/// Updates [`EthStateCache`] in two scenario : | ||
/// 1. On reorgs: sets `EthStateCache::latest_chain_change` to None and removes reorged blocks | ||
/// 2. On new canonical blocks: updates `EthStateCache::latest_chain_change` and caches the new | ||
/// blocks | ||
/// | ||
/// Reorged blocks are removed from the cache. | ||
pub async fn cache_new_blocks_task<St, N: NodePrimitives>( | ||
eth_state_cache: EthStateCache<N::Block, N::Receipt>, | ||
mut eth_state_cache: EthStateCache<N::Block, N::Receipt>, | ||
mut events: St, | ||
) where | ||
St: Stream<Item = CanonStateNotification<N>> + Unpin + 'static, | ||
{ | ||
while let Some(event) = events.next().await { | ||
if let Some(reverted) = event.reverted() { | ||
let chain_change = ChainChange::new(reverted); | ||
|
||
eth_state_cache.latest_chain_change = None; | ||
let _ = | ||
eth_state_cache.to_service.send(CacheAction::RemoveReorgedChain { chain_change }); | ||
} | ||
|
||
let chain_change = ChainChange::new(event.committed()); | ||
|
||
eth_state_cache.latest_chain_change = Some(chain_change.clone()); | ||
let _ = | ||
eth_state_cache.to_service.send(CacheAction::CacheNewCanonicalChain { chain_change }); | ||
} | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think we can use
is_latest()
here