Skip to content

Commit

Permalink
implement pipe-exec-layer-ext-v2
Browse files Browse the repository at this point in the history
  • Loading branch information
nekomoto911 committed Dec 27, 2024
1 parent ac7ef84 commit 64bff39
Show file tree
Hide file tree
Showing 6 changed files with 560 additions and 4 deletions.
2 changes: 2 additions & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,7 @@ members = [
"crates/payload/primitives/",
"crates/payload/validator/",
"crates/pipe-exec-layer-ext/",
"crates/pipe-exec-layer-ext-v2/",
"crates/primitives-traits/",
"crates/primitives/",
"crates/prune/prune",
Expand Down Expand Up @@ -372,6 +373,7 @@ reth-payload-builder = { path = "crates/payload/builder" }
reth-payload-primitives = { path = "crates/payload/primitives" }
reth-payload-validator = { path = "crates/payload/validator" }
reth-pipe-exec-layer-ext = { path = "crates/pipe-exec-layer-ext" }
reth-pipe-exec-layer-ext-v2 = { path = "crates/pipe-exec-layer-ext-v2" }
reth-primitives = { path = "crates/primitives", default-features = false, features = [
"std",
] }
Expand Down
1 change: 1 addition & 0 deletions crates/engine/tree/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ reth-tasks.workspace = true
reth-trie.workspace = true
reth-trie-parallel.workspace = true
reth-pipe-exec-layer-ext.workspace = true
reth-pipe-exec-layer-ext-v2.workspace = true

# common
futures.workspace = true
Expand Down
69 changes: 66 additions & 3 deletions crates/engine/tree/src/tree/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -55,9 +55,8 @@ use std::{
time::Instant,
};
use tokio::sync::{
mpsc::{UnboundedReceiver, UnboundedSender},
oneshot,
oneshot::error::TryRecvError,
mpsc::{self, UnboundedReceiver, UnboundedSender},
oneshot::{self, error::TryRecvError},
};
use tracing::*;

Expand All @@ -69,6 +68,9 @@ pub use config::TreeConfig;
pub use invalid_block_hook::{InvalidBlockHooks, NoopInvalidBlockHook};
pub use reth_engine_primitives::InvalidBlockHook;
use reth_pipe_exec_layer_ext::PIPE_EXEC_LAYER_EXT;
use reth_pipe_exec_layer_ext_v2::{
PipeExecLayerEvent, PIPE_EXEC_LAYER_EXT as PIPE_EXEC_LAYER_EXT_V2,
};

/// Keeps track of the state of the tree.
///
Expand Down Expand Up @@ -624,6 +626,58 @@ where
(incoming, outgoing)
}

fn try_recv_pipe_exec_event(&self) -> Result<Option<PipeExecLayerEvent>, RecvError> {
if let Some(ext) = PIPE_EXEC_LAYER_EXT_V2.get() {
if self.persistence_state.in_progress() {
let mut waited_time_ms = 0;
loop {
match ext.event_rx.blocking_lock().try_recv() {
Ok(event) => return Ok(Some(event)),
Err(mpsc::error::TryRecvError::Empty) => {
if waited_time_ms > 500 {
// timeout
return Ok(None);
}
std::thread::sleep(std::time::Duration::from_millis(10));
waited_time_ms += 10;
}
Err(mpsc::error::TryRecvError::Disconnected) => return Err(RecvError),
}
}
} else {
let event = ext.event_rx.blocking_lock().blocking_recv();
if event.is_some() {
Ok(event)
} else {
Err(RecvError)
}
}
} else {
Ok(None)
}
}

fn on_pipe_exec_event(&mut self, event: PipeExecLayerEvent) {
match event {
PipeExecLayerEvent::InsertExecutedBlock(block, tx) => {
debug!(target: "on_pipe_exec_event", block_number = %block.block().number, block_hash = %block.block().hash(), "Received insert executed block event");
self.state.tree_state.insert_executed(block);
tx.send(()).unwrap();
}
PipeExecLayerEvent::MakeCanonical(payload, tx) => {
let block_number = payload.block_number();
let block_hash = payload.block_hash();
debug!(target: "on_pipe_exec_event", block_number = %block_number, block_hash = %block_hash, "Received make canonical event");
self.on_new_payload(payload, None).unwrap_or_else(|err| {
panic!(
"Failed to make canonical, block_number={block_number} block_hash={block_hash}: {err}",
)
});
tx.send(()).unwrap();
}
}
}

/// Returns a new [`Sender`] to send messages to this type.
pub fn sender(&self) -> Sender<FromEngine<EngineApiRequest<T>>> {
self.incoming_tx.clone()
Expand All @@ -634,6 +688,15 @@ where
/// This will block the current thread and process incoming messages.
pub fn run(mut self) {
loop {
match self.try_recv_pipe_exec_event() {
Ok(Some(event)) => self.on_pipe_exec_event(event),
Ok(None) => {}
Err(RecvError) => {
error!(target: "engine::tree", "Pipe exec layer channel disconnected");
return
}
}

match self.try_recv_engine_message() {
Ok(Some(msg)) => {
debug!(target: "engine::tree", %msg, "received new engine message");
Expand Down
34 changes: 34 additions & 0 deletions crates/pipe-exec-layer-ext-v2/Cargo.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
[package]
name = "reth-pipe-exec-layer-ext-v2"
version.workspace = true
edition.workspace = true
homepage.workspace = true
license.workspace = true
repository.workspace = true
rust-version.workspace = true
description = "pipeline execution layer extension"

[lints]
workspace = true

[dependencies]
reth-primitives.workspace = true
reth-rpc-types.workspace = true
reth-evm-ethereum.workspace = true
reth-chainspec.workspace = true
reth-storage-api.workspace = true
reth-revm.workspace = true
reth-evm.workspace = true
reth-execution-types.workspace = true
reth-trie.workspace = true
reth-chain-state.workspace = true
reth-rpc-types-compat.workspace = true
alloy-primitives.workspace = true
tokio.workspace = true
once_cell.workspace = true

# ethereum
revm.workspace = true

# misc
tracing.workspace = true
Loading

0 comments on commit 64bff39

Please sign in to comment.