Skip to content

Commit

Permalink
add merklize barrier
Browse files Browse the repository at this point in the history
  • Loading branch information
nekomoto911 committed Jan 21, 2025
1 parent 61abcf1 commit c85f421
Showing 1 changed file with 14 additions and 7 deletions.
21 changes: 14 additions & 7 deletions crates/pipe-exec-layer-ext-v2/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,7 @@ struct Core<Storage: GravityStorage> {
chain_spec: Arc<ChainSpec>,
event_tx: std::sync::mpsc::Sender<PipeExecLayerEvent>,
execute_block_barrier: PipeBarrier<Header>,
merklize_barrier: PipeBarrier<()>,
make_canonical_barrier: PipeBarrier<B256 /* block hash */>,
}

Expand All @@ -90,7 +91,7 @@ struct PipeBarrier<Result> {
struct PipeBarrierInner<Result> {
block_number: u64,
result: Option<Result>,
waiters: Vec<oneshot::Sender<()>>,
waiters: Vec<(u64, oneshot::Sender<()>)>,
closed: bool,
}

Expand Down Expand Up @@ -121,7 +122,7 @@ impl<Result> PipeBarrier<Result> {
}

let (tx, rx) = oneshot::channel();
inner.waiters.push(tx);
inner.waiters.push((block_number, tx));
drop(inner);
rx.await.unwrap();
}
Expand All @@ -138,12 +139,14 @@ impl<Result> PipeBarrier<Result> {
assert!(inner.block_number < block_number, "{} {}", inner.block_number, block_number);
inner.block_number = block_number;
inner.result = Some(result);
let waiters = std::mem::take(&mut inner.waiters);
let waiter_to_notify = inner
.waiters
.iter()
.position(|(n, _)| *n == block_number)
.map(|i| inner.waiters.swap_remove(i).1);
drop(inner);

for tx in waiters {
let _ = tx.send(());
}
let _ = waiter_to_notify.map(|tx| tx.send(()));
Some(())
}

Expand All @@ -153,7 +156,7 @@ impl<Result> PipeBarrier<Result> {
let waiters = std::mem::take(&mut inner.waiters);
drop(inner);

for tx in waiters {
for (_, tx) in waiters {
let _ = tx.send(());
}
}
Expand All @@ -166,6 +169,7 @@ impl<Storage: GravityStorage> PipeExecService<Storage> {
Some(ordered_block) => ordered_block,
None => {
self.core.execute_block_barrier.close().await;
self.core.merklize_barrier.close().await;
self.core.make_canonical_barrier.close().await;
return;
}
Expand Down Expand Up @@ -206,8 +210,10 @@ impl<Storage: GravityStorage> Core<Storage> {
let execution_outcome = self.calculate_roots(&mut block, outcome);

// Merkling the state trie
self.merklize_barrier.wait(block_number - 1).await.unwrap();
let (state_root, hashed_state, trie_output) =
self.storage.state_root_with_updates(block_number).unwrap();
self.merklize_barrier.notify(block_number, ()).await.unwrap();
debug!(target: "PipeExecService.process",
block_number=?block_number,
block_id=?block_id,
Expand Down Expand Up @@ -466,6 +472,7 @@ pub fn new_pipe_exec_layer_api<Storage: GravityStorage>(
chain_spec,
event_tx,
execute_block_barrier: PipeBarrier::new(latest_block_number, latest_block_header),
merklize_barrier: PipeBarrier::new(latest_block_number, ()),
make_canonical_barrier: PipeBarrier::new(latest_block_number, latest_block_hash),
}),
ordered_block_rx,
Expand Down

0 comments on commit c85f421

Please sign in to comment.